648540858
2022-03-03 d21322a93258206eb910d7ac3a70a4812fc48cbc
优化国标级联录像预览
10个文件已修改
2个文件已添加
224 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java
New file
@@ -0,0 +1,14 @@
package com.genersoft.iot.vmp.gb28181.bean;
import javax.sdp.SessionDescription;
public class SDPInfo {
    private byte[] source;
    private SessionDescription sdpSource;
    private String sessionName;
    private Long startTime;
    private Long stopTime;
    private String username;
    private String address;
    private String ssrc;
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -453,6 +453,7 @@
            subscribeKey.put("app", "rtp");
            subscribeKey.put("stream", ssrcInfo.getStream());
            subscribeKey.put("regist", true);
            subscribeKey.put("schema", "rtmp");
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
@@ -718,6 +719,7 @@
            if (ssrcTransaction != null) {
                MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
                mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc());
                mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream());
                streamSession.remove(deviceId, channelId, ssrcTransaction.getStream());
            }
        } catch (SipException | ParseException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -68,7 +68,7 @@
     */
    @Override
    public void process(RequestEvent evt) {
        logger.debug("ACK请求: {}", ((System.currentTimeMillis())));
        logger.info("ACK请求: {}", ((System.currentTimeMillis())));
        Dialog dialog = evt.getDialog();
        if (dialog == null) return;
        if (dialog.getState()== DialogState.CONFIRMED) {
@@ -88,10 +88,6 @@
                streamInfo = new StreamInfo();
                streamInfo.setApp(sendRtpItem.getApp());
                streamInfo.setStream(sendRtpItem.getStreamId());
            }else {
                streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
                sendRtpItem.setStreamId(streamInfo.getStream());
                streamInfo.setApp("rtp");
            }
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
            logger.info(platformGbId);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@@ -90,7 +91,7 @@
                    int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
                    if (totalReaderCount == 0) {
                        logger.info(streamId + "无其它观看者,通知设备停止推流");
                        cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
                        cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId);
                    }else if (totalReaderCount == -1){
                        logger.warn(streamId + " 查找其它观看者失败");
                    }
@@ -98,22 +99,24 @@
                // 可能是设备主动停止
                Device device = storager.queryVideoDeviceByChannelId(platformGbId);
                if (device != null) {
                    if (sendRtpItem.isPlay()) {
                        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
                    if (sendRtpItem != null) {
                        if (sendRtpItem.isPlay()) {
                        if (streamInfo != null) {
                            redisCatchStorage.stopPlay(streamInfo);
                        }
                    }else {
                        StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), channelId);
                        if (streamInfo != null) {
                            redisCatchStorage.stopPlayback(streamInfo);
                        }
                    }
                    storager.stopPlay(device.getDeviceId(), channelId);
                    mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream());
                        mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
                }
            }
            }
        } catch (SipException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -12,6 +12,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -101,19 +102,12 @@
    @Override
    public void process(RequestEvent evt) {
        //  Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
        Long startTimeForInvite = System.currentTimeMillis();
        try {
            Request request = evt.getRequest();
            SipURI sipURI = (SipURI) request.getRequestURI();
            String channelId = sipURI.getUser();
            String requesterId = null;
            FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
            String requesterId = SipUtils.getUserIdFromFromHeader(request);
            CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
            AddressImpl address = (AddressImpl) fromHeader.getAddress();
            SipUri uri = (SipUri) address.getURI();
            requesterId = uri.getUser();
            if (requesterId == null || channelId == null) {
                logger.info("无法从FromHeader的Address中获取到平台id,返回400");
                responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误
@@ -122,7 +116,9 @@
            // 查询请求是否来自上级平台\设备
            ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
            if (platform != null) {
            if (platform == null) {
                inviteFromDeviceHandle(evt, requesterId);
            }else {
                // 查询平台下是否有该通道
                DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
                GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
@@ -141,7 +137,7 @@
                    mediaServerItem = mediaServerService.getOne(mediaServerId);
                    if (mediaServerItem == null) {
                        logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
                        responseAck(evt, Response.GONE, "media server not found");
                        responseAck(evt, Response.GONE);
                        return;
                    }
                    Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
@@ -197,7 +193,6 @@
                // 查看是否支持PS 负载96
                //String ip = null;
                int port = -1;
                //boolean recvonly = false;
                boolean mediaTransmissionTCP = false;
                Boolean tcpActive = null;
                for (Object description : mediaDescriptions) {
@@ -233,7 +228,6 @@
                }
                String username = sdp.getOrigin().getUsername();
                String addressStr = sdp.getOrigin().getAddress();
                //String sessionName = sdp.getSessionName().getValue();
                logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc);
                Device device  = null;
                // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
@@ -271,8 +265,10 @@
                    Long finalStartTime = startTime;
                    Long finalStopTime = stopTime;
                    ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
                        logger.info("[上级点播]收到下级开始点播订阅, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId());
                        // if (sendRtpItem == null) return;
                        logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId());
                        //     * 0 等待设备推流上来
                        //     * 1 下级已经推流,等待上级平台回复ack
                        //     * 2 推流中
                        sendRtpItem.setStatus(1);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -301,9 +297,6 @@
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                        if ("Playback".equals(sessionName) && responseJSON != null) {
                            playService.onPublishHandlerForPlayBack(finalMediaServerItem, responseJSON, finalDevice.getDeviceId(), channelId, null);
                        }
                    };
                    SipSubscribe.Event errorEvent = ((event) -> {
                        // 未知错误。直接转发设备点播的错误
@@ -319,10 +312,29 @@
                    });
                    if ("Playback".equals(sessionName)) {
                        sendRtpItem.setPlay(false);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, sendRtpItem.getSsrc(), true);
                        sendRtpItem.setStreamId(ssrc);
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        commander.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, format.format(start), format.format(end), hookEvent, errorEvent);
                        playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> {
                            if (result.getCode() != 0){
                                logger.warn("录像回放失败");
                                if (result.getEvent() != null) {
                                    errorEvent.response(result.getEvent());
                                }
                                try {
                                    responseAck(evt, Response.REQUEST_TIMEOUT);
                                } catch (SipException e) {
                                    e.printStackTrace();
                                } catch (InvalidArgumentException e) {
                                    e.printStackTrace();
                                } catch (ParseException e) {
                                    e.printStackTrace();
                                }
                            }else {
                                if (result.getMediaServerItem() != null) {
                                    hookEvent.response(result.getMediaServerItem(), result.getResponse());
                                }
                            }
                        });
                    }else {
                        sendRtpItem.setPlay(true);
                        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
@@ -333,7 +345,7 @@
                            sendRtpItem.setPlay(false);
                            playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent);
                        }else {
                            sendRtpItem.setStreamId(streamInfo.getStreamId());
                            sendRtpItem.setStreamId(streamInfo.getStream());
                            hookEvent.response(mediaServerItem, null);
                        }
                    }
@@ -379,9 +391,24 @@
                    }
                }
            } else {
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
            logger.warn("sdp解析错误");
            e.printStackTrace();
        } catch (SdpParseException e) {
            e.printStackTrace();
        } catch (SdpException e) {
            e.printStackTrace();
        }
    }
    public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
                // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
                Device device = redisCatchStorage.getDevice(requesterId);
        Request request = evt.getRequest();
                if (device != null) {
                    logger.info("收到设备" + requesterId + "的语音广播Invite请求");
                    responseAck(evt, Response.TRYING);
@@ -444,17 +471,6 @@
                } else {
                    logger.warn("来自无效设备/平台的请求");
                    responseAck(evt, Response.BAD_REQUEST);
                }
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
            logger.warn("sdp解析错误");
            e.printStackTrace();
        } catch (SdpParseException e) {
            e.printStackTrace();
        } catch (SdpException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -48,7 +48,7 @@
    SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean isPlayback);
    void closeRTPServer(Device device, String channelId, String ssrc);
    void closeRTPServer(String deviceId, String channelId, String ssrc);
    void clearRTPServer(MediaServerItem mediaServerItem);
src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java
@@ -1,9 +1,10 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface PlayBackCallback {
    void call(RequestMessage msg);
    void call(PlayBackResult<RequestMessage> msg);
}
src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java
New file
@@ -0,0 +1,55 @@
package com.genersoft.iot.vmp.service.bean;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import javax.sip.RequestEvent;
public class PlayBackResult<T> {
     private int code;
     private T data;
     private MediaServerItem mediaServerItem;
    private JSONObject response;
    private SipSubscribe.EventResult event;
    public int getCode() {
        return code;
    }
    public void setCode(int code) {
        this.code = code;
    }
    public T getData() {
        return data;
    }
    public void setData(T data) {
        this.data = data;
    }
    public MediaServerItem getMediaServerItem() {
        return mediaServerItem;
    }
    public void setMediaServerItem(MediaServerItem mediaServerItem) {
        this.mediaServerItem = mediaServerItem;
    }
    public JSONObject getResponse() {
        return response;
    }
    public void setResponse(JSONObject response) {
        this.response = response;
    }
    public SipSubscribe.EventResult getEvent() {
        return event;
    }
    public void setEvent(SipSubscribe.EventResult event) {
        this.event = event;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -160,16 +160,16 @@
    }
    @Override
    public void closeRTPServer(Device device, String channelId, String stream) {
        String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId, stream);
        String ssrc = streamSession.getSSRC(device.getDeviceId(), channelId, stream);
    public void closeRTPServer(String deviceId, String channelId, String stream) {
        String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream);
        String ssrc = streamSession.getSSRC(deviceId, channelId, stream);
        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
        if (mediaServerItem != null) {
            String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
            String streamId = String.format("%s_%s", deviceId, channelId);
            zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
            releaseSsrc(mediaServerItem, ssrc);
        }
        streamSession.remove(device.getDeviceId(), channelId, stream);
        streamSession.remove(deviceId, channelId, stream);
    }
    @Override
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -17,6 +17,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackResult;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
@@ -115,11 +116,8 @@
            msg.setData(wvpResult);
            // 点播超时回复BYE
            cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream());
            // 释放rtpserver
            mediaServerService.closeRTPServer(playResult.getDevice(), channelId, streamInfo.getStream());
            // 回复之前所有的点播请求
            resultHolder.invokeAllResult(msg);
            // TODO 释放ssrc
        });
        result.onCompletion(()->{
            // 点播结束时调用截图接口
@@ -173,7 +171,10 @@
                WVPResult wvpResult = new WVPResult();
                wvpResult.setCode(-1);
                // 点播返回sip错误
                mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream());
                mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
                // 释放ssrc
                mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
                streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
                msg.setData(wvpResult);
                resultHolder.invokeAllResult(msg);
@@ -222,7 +223,10 @@
                    logger.info("收到订阅消息: " + response.toJSONString());
                    onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid);
                }, (event) -> {
                    mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream());
                    mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
                    // 释放ssrc
                    mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
                    streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                    WVPResult wvpResult = new WVPResult();
                    wvpResult.setCode(-1);
                    wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
@@ -240,7 +244,7 @@
        RequestMessage msg = new RequestMessage();
        msg.setId(uuid);
        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId);
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
        if (streamInfo != null) {
            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
            if (deviceChannel != null) {
@@ -298,9 +302,12 @@
        RequestMessage msg = new RequestMessage();
        msg.setId(uuid);
        msg.setKey(key);
        PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
        result.onTimeout(()->{
            msg.setData("回放超时");
            callback.call(msg);
            playBackResult.setCode(-1);
            playBackResult.setData(msg);
            callback.call(playBackResult);
        });
        cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> {
            logger.info("收到订阅消息: " + response.toJSONString());
@@ -308,15 +315,24 @@
            if (streamInfo == null) {
                logger.warn("设备回放API调用失败!");
                msg.setData("设备回放API调用失败!");
                callback.call(msg);
                playBackResult.setCode(-1);
                playBackResult.setData(msg);
                callback.call(playBackResult);
                return;
            }
            redisCatchStorage.startPlayback(streamInfo);
            msg.setData(JSON.toJSONString(streamInfo));
            callback.call(msg);
            playBackResult.setCode(0);
            playBackResult.setData(msg);
            playBackResult.setMediaServerItem(mediaServerItem);
            playBackResult.setResponse(response);
            callback.call(playBackResult);
        }, event -> {
            msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
            callback.call(msg);
            playBackResult.setCode(-1);
            playBackResult.setData(msg);
            playBackResult.setEvent(event);
            callback.call(playBackResult);
        });
        return result;
    }
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -129,7 +129,6 @@
            //Response response = event.getResponse();
            msg.setData(String.format("success"));
            resultHolder.invokeAllResult(msg);
            mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream());
        });
        if (deviceId != null || channelId != null) {
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java
@@ -77,8 +77,8 @@
            logger.debug(String.format("设备回放 API调用,deviceId:%s ,channelId:%s", deviceId, channelId));
        }
        DeferredResult<ResponseEntity<String>> result = playService.playBack(deviceId, channelId, startTime, endTime, msg->{
            resultHolder.invokeResult(msg);
        DeferredResult<ResponseEntity<String>> result = playService.playBack(deviceId, channelId, startTime, endTime, wvpResult->{
            resultHolder.invokeResult(wvpResult.getData());
        });
        return result;