使用设备Id+通道Id作为session的识别标识,解决点播异常时无法释放session的问题
15个文件已修改
151 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application.yml 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/channelList.vue 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/devicePlayer.vue 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
@@ -16,6 +16,7 @@
    private ConcurrentHashMap<String, ClientTransaction> sessionMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, String> ssrcMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, String> streamIdMap = new ConcurrentHashMap<>();
    public String createPlaySsrc(){
        return SsrcUtil.getPlaySsrc();
@@ -25,18 +26,23 @@
        return SsrcUtil.getPlayBackSsrc();
    }
    
    public void put(String streamId,String ssrc,ClientTransaction transaction){
        sessionMap.put(streamId, transaction);
        ssrcMap.put(streamId, ssrc);
    public void put(String deviceId, String channelId ,String ssrc, String streamId, ClientTransaction transaction){
        sessionMap.put(deviceId + "_" + channelId, transaction);
        ssrcMap.put(deviceId + "_" + channelId, ssrc);
        streamIdMap.put(deviceId + "_" + channelId, streamId);
    }
    
    public ClientTransaction get(String streamId){
        return sessionMap.get(streamId);
    public ClientTransaction getTransaction(String deviceId, String channelId){
        return sessionMap.get(deviceId + "_" + channelId);
    }
    
    public void remove(String streamId) {
        sessionMap.remove(streamId);
        SsrcUtil.releaseSsrc(ssrcMap.get(streamId));
        ssrcMap.remove(streamId);
    public String getStreamId(String deviceId, String channelId){
        return streamIdMap.get(deviceId + "_" + channelId);
    }
    public void remove(String deviceId, String channelId) {
        sessionMap.remove(deviceId + "_" + channelId);
        SsrcUtil.releaseSsrc(ssrcMap.get(deviceId + "_" + channelId));
        ssrcMap.remove(deviceId + "_" + channelId);
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -87,7 +87,6 @@
    /**
     * 请求预览视频流
     *
     * @param device  视频设备
     * @param channelId  预览通道
     */
@@ -108,8 +107,8 @@
     * 
     * @param ssrc  ssrc
     */
    void streamByeCmd(String ssrc, SipSubscribe.Event okEvent);
    void streamByeCmd(String ssrc);
    void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent);
    void streamByeCmd(String deviceId, String channelId);
    /**
     * 语音广播
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -339,10 +339,10 @@
     */
    @Override
    public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
        String streamId = null;
        try {
            if (device == null) return;
            String ssrc = streamSession.createPlaySsrc();
            String streamId = null;
            if (rtpEnable) {
                streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
            }else {
@@ -444,8 +444,11 @@
            Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrc, callIdHeader);
            ClientTransaction transaction = transmitRequest(device, request, errorEvent);
            streamSession.put(streamId,ssrc, transaction);
            ClientTransaction transaction = transmitRequest(device, request, (e -> {
                streamSession.remove(device.getDeviceId(), channelId);
                errorEvent.response(e);
            }));
            streamSession.put(device.getDeviceId(), channelId ,ssrc,streamId, transaction);
        } catch ( SipException | ParseException | InvalidArgumentException e) {
            e.printStackTrace();
@@ -552,7 +555,7 @@
            Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader);
            ClientTransaction transaction = transmitRequest(device, request, errorEvent);
            streamSession.put(streamId, ssrc, transaction);
            streamSession.put(device.getDeviceId(), channelId, ssrc, streamId, transaction);
        } catch ( SipException | ParseException | InvalidArgumentException e) {
            e.printStackTrace();
@@ -566,17 +569,17 @@
     * 
     */
    @Override
    public void streamByeCmd(String ssrc) {
        streamByeCmd(ssrc, null);
    public void streamByeCmd(String deviceId, String channelId) {
        streamByeCmd(deviceId, channelId, null);
    }
    @Override
    public void streamByeCmd(String streamId, SipSubscribe.Event okEvent) {
    public void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent) {
        
        try {
            ClientTransaction transaction = streamSession.get(streamId);
            ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId);
            // 服务重启后
            if (transaction == null) {
                StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
                StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
                if (streamInfo != null) {
                }
@@ -613,14 +616,9 @@
            }
            dialog.sendRequest(clientTransaction);
            streamSession.remove(streamId);
            zlmrtpServerFactory.closeRTPServer(streamId);
        } catch (TransactionDoesNotExistException e) {
            e.printStackTrace();
        } catch (SipException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            zlmrtpServerFactory.closeRTPServer(streamSession.getStreamId(deviceId, channelId));
            streamSession.remove(deviceId, channelId);
        } catch (SipException | ParseException e) {
            e.printStackTrace();
        }
    }
@@ -641,7 +639,6 @@
     * 语音广播
     * 
     * @param device  视频设备
     * @param channelId  预览通道
     */
    @Override
    public boolean audioBroadcastCmd(Device device) {
@@ -1140,7 +1137,7 @@
     * @param device        视频设备
     * @param startPriority    报警起始级别(可选)
     * @param endPriority    报警终止级别(可选)
     * @param alarmMethods    报警方式条件(可选)
     * @param alarmMethod    报警方式条件(可选)
     * @param alarmType        报警类型
     * @param startTime        报警发生起始时间(可选)
     * @param endTime        报警发生终止时间(可选)
@@ -1428,5 +1425,6 @@
            String streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
            zlmrtpServerFactory.closeRTPServer(streamId);
        }
        streamSession.remove(device.getDeviceId(), channelId);
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
@@ -58,7 +58,7 @@
                redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
                if (zlmrtpServerFactory.totalReaderCount(sendRtpItem.getApp(), streamId) == 0) {
                    System.out.println(streamId + "无其它观看者,通知设备停止推流");
                    cmder.streamByeCmd(streamId);
                    cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
                }
            }
        } catch (SipException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -922,7 +922,7 @@
                StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*");
                if (streamInfo != null) {
                    redisCatchStorage.stopPlayback(streamInfo);
                    cmder.streamByeCmd(streamInfo.getStreamId());
                    cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
                }
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -306,12 +306,12 @@
                if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
                    ret.put("close", false);
                } else {
                    cmder.streamByeCmd(streamId);
                    cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
                    redisCatchStorage.stopPlay(streamInfo);
                    storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                }
            }else{
                cmder.streamByeCmd(streamId);
                cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
                streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
                redisCatchStorage.stopPlayback(streamInfo);
            }
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -63,7 +63,16 @@
        playResult.setResult(result);
        // 录像查询以channelId作为deviceId查询
        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
        // 超时处理
        result.onTimeout(()->{
            logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
            // 释放rtpserver
            cmder.closeRTPServer(playResult.getDevice(), channelId);
            RequestMessage msg = new RequestMessage();
            msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid());
            msg.setData("Timeout");
            resultHolder.invokeResult(msg);
        });
        if (streamInfo == null) {
            // 发送点播消息
            cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
@@ -76,6 +85,7 @@
                RequestMessage msg = new RequestMessage();
                msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
                Response response = event.getResponse();
                cmder.closeRTPServer(playResult.getDevice(), channelId);
                msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
                resultHolder.invokeResult(msg);
                if (errorEvent != null) {
@@ -107,6 +117,7 @@
                    logger.info("收到订阅消息: " + response.toJSONString());
                    onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
                }, event -> {
                    cmder.closeRTPServer(playResult.getDevice(), channelId);
                    RequestMessage msg = new RequestMessage();
                    msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
                    Response response = event.getResponse();
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -36,7 +36,7 @@
    StreamInfo queryPlaybackByStreamId(String steamId);
    StreamInfo queryPlayByDevice(String deviceId, String code);
    StreamInfo queryPlayByDevice(String deviceId, String channelId);
    /**
     * 更新流媒体信息
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -75,11 +75,11 @@
    }
    @Override
    public StreamInfo queryPlayByDevice(String deviceId, String code) {
    public StreamInfo queryPlayByDevice(String deviceId, String channelId) {
//        List<Object> playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
        List<Object> playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
                deviceId,
                code));
                channelId));
        if (playLeys == null || playLeys.size() == 0) return null;
        return (StreamInfo)redis.get(playLeys.get(0).toString());
    }
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -75,27 +75,19 @@
        PlayResult playResult = playService.play(deviceId, channelId, null, null);
        // 超时处理
        playResult.getResult().onTimeout(()->{
            logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
            // 释放rtpserver
            cmder.closeRTPServer(playResult.getDevice(), channelId);
            RequestMessage msg = new RequestMessage();
            msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid());
            msg.setData("Timeout");
            resultHolder.invokeResult(msg);
        });
        return playResult.getResult();
    }
    @ApiOperation("停止点播")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "streamId", value = "视频流ID", dataTypeClass = String.class),
            @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
            @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
    })
    @GetMapping("/stop/{streamId}")
    public DeferredResult<ResponseEntity<String>> playStop(@PathVariable String streamId) {
    @GetMapping("/stop/{deviceId}/{channelId}")
    public DeferredResult<ResponseEntity<String>> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
        logger.debug(String.format("设备预览/回放停止API调用,streamId:%s", streamId));
        logger.debug(String.format("设备预览/回放停止API调用,streamId:%s/$s", deviceId, channelId ));
        UUID uuid = UUID.randomUUID();
        DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
@@ -103,8 +95,8 @@
        // 录像查询以channelId作为deviceId查询
        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_STOP + uuid, result);
        cmder.streamByeCmd(streamId, event -> {
            StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
        cmder.streamByeCmd(deviceId, channelId, event -> {
            StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
            if (streamInfo == null) {
                RequestMessage msg = new RequestMessage();
                msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
@@ -121,9 +113,10 @@
            }
        });
        if (streamId != null) {
        if (deviceId != null || channelId != null) {
            JSONObject json = new JSONObject();
            json.put("streamId", streamId);
            json.put("deviceId", deviceId);
            json.put("channelId", channelId);
            RequestMessage msg = new RequestMessage();
            msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
            msg.setData(json.toString());
@@ -138,7 +131,7 @@
        // 超时处理
        result.onTimeout(()->{
            logger.warn(String.format("设备预览/回放停止超时,streamId:%s ", streamId));
            logger.warn(String.format("设备预览/回放停止超时,deviceId/channelId:%s/$s ", deviceId, channelId));
            RequestMessage msg = new RequestMessage();
            msg.setId(DeferredResultHolder.CALLBACK_CMD_STOP + uuid);
            msg.setData("Timeout");
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java
@@ -84,7 +84,7 @@
        StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
        if (streamInfo != null) {
            // 停止之前的回放
            cmder.streamByeCmd(streamInfo.getStreamId());
            cmder.streamByeCmd(deviceId, channelId);
        }
        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
        cmder.playbackStreamCmd(device, channelId, startTime, endTime, (JSONObject response) -> {
@@ -103,20 +103,22 @@
    @ApiOperation("停止视频回放")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "ssrc", value = "视频流标识", dataTypeClass = String.class),
            @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
            @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
    })
    @GetMapping("/stop/{ssrc}")
    public ResponseEntity<String> playStop(@PathVariable String ssrc) {
    @GetMapping("/stop/{deviceId}/{channelId}")
    public ResponseEntity<String> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
        cmder.streamByeCmd(ssrc);
        cmder.streamByeCmd(deviceId, channelId);
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("设备录像回放停止 API调用,ssrc:%s", ssrc));
            logger.debug(String.format("设备录像回放停止 API调用,deviceId/channelId:%s/%s", deviceId, channelId));
        }
        if (ssrc != null) {
        if (deviceId != null && channelId != null) {
            JSONObject json = new JSONObject();
            json.put("ssrc", ssrc);
            json.put("deviceId", deviceId);
            json.put("channelId", channelId);
            return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
        } else {
            logger.warn("设备录像回放停止API调用失败!");
src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java
@@ -163,7 +163,7 @@
            result.put("error","未找到流信息");
            return result;
        }
        cmder.streamByeCmd(streamInfo.getStreamId());
        cmder.streamByeCmd(serial, code);
        redisCatchStorage.stopPlay(streamInfo);
        storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
        return null;
src/main/resources/application.yml
@@ -1,3 +1,3 @@
spring:
  profiles:
    active: dev
    active: local
web_src/src/components/channelList.vue
@@ -216,12 +216,12 @@
            var that = this;
            this.$axios({
                method: 'get',
                url: '/api/play/stop/' + itemData.streamId
                url: '/api/play/stop/' + this.deviceId + "/" + itemData.channelId
            }).then(function (res) {
                console.log(JSON.stringify(res));
                that.initData();
            }).catch(function (error) {
              if (error.response.status == 402) { // 已经停止过
              if (error.response.status === 402) { // 已经停止过
                that.initData();
              }else {
                console.log(error)
web_src/src/components/dialog/devicePlayer.vue
@@ -415,7 +415,7 @@
            this.videoUrl = '';
            this.$axios({
                method: 'get',
                url: '/api/playback/stop/' + this.streamId
                url: '/api/playback/stop/' + this.deviceId + "/" + this.channelId
            }).then(function (res) {
                if (callback) callback()
            });