panlinlin
2021-01-15 1b44ba33671e27c3d206d875306b226c770b7980
完成向上级联->点播--002
7个文件已修改
1个文件已添加
266 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -15,6 +15,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*;
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -103,6 +104,9 @@
    @Autowired
    private OtherResponseProcessor otherResponseProcessor;
    @Autowired
    private IPlayService playService;
    // 注:这里使用注解会导致循环依赖注入,暂用springBean
    private SipProvider tcpSipProvider;
@@ -120,7 +124,9 @@
            processor.setTcpSipProvider(getTcpSipProvider());
            processor.setUdpSipProvider(getUdpSipProvider());
            processor.setCmder(cmder);
            processor.setCmderFroPlatform(cmderFroPlatform);
            processor.setPlayService(playService);
            processor.setStorager(storager);
            return processor;
        } else if (Request.REGISTER.equals(method)) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java
@@ -10,19 +10,26 @@
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.sdp.Codec;
import com.genersoft.iot.vmp.gb28181.sdp.MediaDescription;
import com.genersoft.iot.vmp.gb28181.sdp.SdpParser;
import com.genersoft.iot.vmp.gb28181.sdp.SessionDescription;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.text.ParseException;
@@ -40,6 +47,10 @@
    private SIPCommanderFroPlatform cmderFroPlatform;
    private IVideoManagerStorager storager;
    private SIPCommander cmder;
    private IPlayService playService;
    /**
     * 处理invite请求
@@ -119,7 +130,30 @@
            String ssrc = sdp.getSsrc();
            Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
            if (device == null) {
                logger.warn("点播平台{}的通道{}时未找到设备信息", platformId, channel);
                response500Ack(evt);
                return;
            }
            // 通知下级推流,
            PlayResult playResult = playService.play(device.getDeviceId(), channelId, (response)->{
                // 收到推流, 回复200OK
            },(event -> {
                // 未知错误。直接转发设备点播的错误
                Response response = null;
                try {
                    response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
                    getServerTransaction(evt).sendResponse(response);
                } catch (ParseException | SipException | InvalidArgumentException e) {
                    e.printStackTrace();
                }
            }));
            playResult.getResult();
            // 查找合适的端口推流,
            // 发送 200ok
            // 收到ack后调用推流接口
@@ -149,14 +183,16 @@
    }
    /***
     * 回复404
     * 回复200 OK
     * @param evt
     * @throws SipException
     * @throws InvalidArgumentException
     * @throws ParseException
     */
    private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest());
    private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
        ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
        response.setContent(sdp, contentTypeHeader);
        getServerTransaction(evt).sendResponse(response);
    }
@@ -173,6 +209,18 @@
    }
    /***
     * 回复404
     * @param evt
     * @throws SipException
     * @throws InvalidArgumentException
     * @throws ParseException
     */
    private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest());
        getServerTransaction(evt).sendResponse(response);
    }
    /***
     * 回复488
     * @param evt
     * @throws SipException
@@ -185,18 +233,18 @@
    }
    /***
     * 回复200 OK
     * 回复500
     * @param evt
     * @throws SipException
     * @throws InvalidArgumentException
     * @throws ParseException
     */
    private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
        ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
        response.setContent(sdp, contentTypeHeader);
    private void response500Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(Response.SERVER_INTERNAL_ERROR, evt.getRequest());
        getServerTransaction(evt).sendResponse(response);
    }
@@ -222,4 +270,20 @@
    public void setStorager(IVideoManagerStorager storager) {
        this.storager = storager;
    }
    public SIPCommander getCmder() {
        return cmder;
    }
    public void setCmder(SIPCommander cmder) {
        this.cmder = cmder;
    }
    public IPlayService getPlayService() {
        return playService;
    }
    public void setPlayService(IPlayService playService) {
        this.playService = playService;
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -234,4 +234,6 @@
    DeviceChannel queryChannelInParentPlatform(String platformId, String channelId);
    Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -335,4 +335,9 @@
        DeviceChannel channel = patformChannelMapper.queryChannelInParentPlatform(platformId, channelId);
        return channel;
    }
    @Override
    public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) {
        return null;
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java
@@ -10,6 +10,7 @@
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,62 +65,19 @@
                                                       @PathVariable String channelId) {
        Device device = storager.queryVideoDevice(deviceId);
        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
        UUID uuid = UUID.randomUUID();
        DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
        // 录像查询以channelId作为deviceId查询
        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
        if (streamInfo == null) {
            // 发送点播消息
            cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
                logger.info("收到订阅消息: " + response.toJSONString());
                playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
            }, event -> {
                RequestMessage msg = new RequestMessage();
                msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
                Response response = event.getResponse();
                msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
                resultHolder.invokeResult(msg);
            });
        } else {
            String streamId = streamInfo.getStreamId();
            JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
            if (rtpInfo.getBoolean("exist")) {
                RequestMessage msg = new RequestMessage();
                msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
                msg.setData(JSON.toJSONString(streamInfo));
                resultHolder.invokeResult(msg);
            } else {
                redisCatchStorage.stopPlay(streamInfo);
                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
                    logger.info("收到订阅消息: " + response.toJSONString());
                    playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
                }, event -> {
                    RequestMessage msg = new RequestMessage();
                    msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
                    Response response = event.getResponse();
                    msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
                    resultHolder.invokeResult(msg);
                });
            }
        }
        PlayResult playResult = playService.play(deviceId, channelId, null, null);
        // 超时处理
        result.onTimeout(()->{
        playResult.getResult().onTimeout(()->{
            logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
            // 释放rtpserver
            cmder.closeRTPServer(device, channelId);
            cmder.closeRTPServer(playResult.getDevice(), channelId);
            RequestMessage msg = new RequestMessage();
            msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
            msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid());
            msg.setData("Timeout");
            resultHolder.invokeResult(msg);
        });
        return result;
        return playResult.getResult();
    }
    @PostMapping("/play/{streamId}/stop")
src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java
New file
@@ -0,0 +1,37 @@
package com.genersoft.iot.vmp.vmanager.play.bean;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult;
public class PlayResult {
    private DeferredResult<ResponseEntity<String>> result;
    private String uuid;
    private Device device;
    public DeferredResult<ResponseEntity<String>> getResult() {
        return result;
    }
    public void setResult(DeferredResult<ResponseEntity<String>> result) {
        this.result = result;
    }
    public String getUuid() {
        return uuid;
    }
    public void setUuid(String uuid) {
        this.uuid = uuid;
    }
    public Device getDevice() {
        return device;
    }
    public void setDevice(Device device) {
        this.device = device;
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java
@@ -2,6 +2,11 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult;
/**
 * 点播处理
@@ -10,4 +15,6 @@
    void onPublishHandlerForPlayBack(JSONObject resonse, String deviceId, String channelId, String uuid);
    void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid);
    PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
}
src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java
@@ -4,19 +4,29 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.play.PlayController;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.message.Response;
import java.text.DecimalFormat;
import java.util.UUID;
@Service
public class PlayServiceImpl implements IPlayService {
@@ -27,11 +37,76 @@
    private IVideoManagerStorager storager;
    @Autowired
    private SIPCommander cmder;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private DeferredResultHolder resultHolder;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Override
    public PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
        PlayResult playResult = new PlayResult();
        Device device = storager.queryVideoDevice(deviceId);
        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
        playResult.setDevice(device);
        UUID uuid = UUID.randomUUID();
        playResult.setUuid(uuid.toString());
        DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
        playResult.setResult(result);
        // 录像查询以channelId作为deviceId查询
        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
        if (streamInfo == null) {
            // 发送点播消息
            cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
                logger.info("收到订阅消息: " + response.toJSONString());
                onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
                if (hookEvent != null) {
                    hookEvent.response(response);
                }
            }, event -> {
                RequestMessage msg = new RequestMessage();
                msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
                Response response = event.getResponse();
                msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
                resultHolder.invokeResult(msg);
                if (errorEvent != null) {
                    errorEvent.response(event);
                }
            });
        } else {
            String streamId = streamInfo.getStreamId();
            JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
            if (rtpInfo.getBoolean("exist")) {
                RequestMessage msg = new RequestMessage();
                msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
                msg.setData(JSON.toJSONString(streamInfo));
                resultHolder.invokeResult(msg);
            } else {
                redisCatchStorage.stopPlay(streamInfo);
                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
                    logger.info("收到订阅消息: " + response.toJSONString());
                    onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
                }, event -> {
                    RequestMessage msg = new RequestMessage();
                    msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
                    Response response = event.getResponse();
                    msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
                    resultHolder.invokeResult(msg);
                });
            }
        }
        return playResult;
    }
    @Override
    public void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid) {
        RequestMessage msg = new RequestMessage();