使用设备Id+通道Id作为session的识别标识,解决点播异常时无法释放session的问题
| | |
| | |
|
| | | private ConcurrentHashMap<String, ClientTransaction> sessionMap = new ConcurrentHashMap<>();
|
| | | private ConcurrentHashMap<String, String> ssrcMap = new ConcurrentHashMap<>();
|
| | | private ConcurrentHashMap<String, String> streamIdMap = new ConcurrentHashMap<>();
|
| | |
|
| | | public String createPlaySsrc(){
|
| | | return SsrcUtil.getPlaySsrc();
|
| | |
| | | return SsrcUtil.getPlayBackSsrc();
|
| | | }
|
| | |
|
| | | public void put(String streamId,String ssrc,ClientTransaction transaction){
|
| | | sessionMap.put(streamId, transaction);
|
| | | ssrcMap.put(streamId, ssrc);
|
| | | public void put(String deviceId, String channelId ,String ssrc, String streamId, ClientTransaction transaction){
|
| | | sessionMap.put(deviceId + "_" + channelId, transaction);
|
| | | ssrcMap.put(deviceId + "_" + channelId, ssrc);
|
| | | streamIdMap.put(deviceId + "_" + channelId, streamId);
|
| | | }
|
| | |
|
| | | public ClientTransaction get(String streamId){
|
| | | return sessionMap.get(streamId);
|
| | | public ClientTransaction getTransaction(String deviceId, String channelId){
|
| | | return sessionMap.get(deviceId + "_" + channelId);
|
| | | }
|
| | |
|
| | | public void remove(String streamId) {
|
| | | sessionMap.remove(streamId);
|
| | | SsrcUtil.releaseSsrc(ssrcMap.get(streamId));
|
| | | ssrcMap.remove(streamId);
|
| | | public String getStreamId(String deviceId, String channelId){
|
| | | return streamIdMap.get(deviceId + "_" + channelId);
|
| | | }
|
| | | |
| | | public void remove(String deviceId, String channelId) {
|
| | | sessionMap.remove(deviceId + "_" + channelId);
|
| | | SsrcUtil.releaseSsrc(ssrcMap.get(deviceId + "_" + channelId));
|
| | | ssrcMap.remove(deviceId + "_" + channelId);
|
| | | }
|
| | | }
|
| | |
| | |
|
| | | /**
|
| | | * 请求预览视频流
|
| | | * |
| | | * @param device 视频设备
|
| | | * @param channelId 预览通道
|
| | | */
|
| | |
| | | *
|
| | | * @param ssrc ssrc
|
| | | */
|
| | | void streamByeCmd(String ssrc, SipSubscribe.Event okEvent);
|
| | | void streamByeCmd(String ssrc);
|
| | | void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent);
|
| | | void streamByeCmd(String deviceId, String channelId);
|
| | |
|
| | | /**
|
| | | * 语音广播
|
| | |
| | | */
|
| | | @Override
|
| | | public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
|
| | | String streamId = null;
|
| | | try {
|
| | | if (device == null) return;
|
| | | String ssrc = streamSession.createPlaySsrc();
|
| | | String streamId = null;
|
| | | if (rtpEnable) {
|
| | | streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
|
| | | }else {
|
| | |
| | |
|
| | | Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrc, callIdHeader);
|
| | |
|
| | | ClientTransaction transaction = transmitRequest(device, request, errorEvent);
|
| | | streamSession.put(streamId,ssrc, transaction);
|
| | | ClientTransaction transaction = transmitRequest(device, request, (e -> {
|
| | | streamSession.remove(device.getDeviceId(), channelId);
|
| | | errorEvent.response(e);
|
| | | }));
|
| | | streamSession.put(device.getDeviceId(), channelId ,ssrc,streamId, transaction);
|
| | |
|
| | | } catch ( SipException | ParseException | InvalidArgumentException e) {
|
| | | e.printStackTrace();
|
| | |
| | | Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader);
|
| | |
|
| | | ClientTransaction transaction = transmitRequest(device, request, errorEvent);
|
| | | streamSession.put(streamId, ssrc, transaction);
|
| | | streamSession.put(device.getDeviceId(), channelId, ssrc, streamId, transaction);
|
| | |
|
| | | } catch ( SipException | ParseException | InvalidArgumentException e) {
|
| | | e.printStackTrace();
|
| | |
| | | *
|
| | | */
|
| | | @Override
|
| | | public void streamByeCmd(String ssrc) {
|
| | | streamByeCmd(ssrc, null);
|
| | | public void streamByeCmd(String deviceId, String channelId) {
|
| | | streamByeCmd(deviceId, channelId, null);
|
| | | }
|
| | | @Override
|
| | | public void streamByeCmd(String streamId, SipSubscribe.Event okEvent) {
|
| | | public void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent) {
|
| | |
|
| | | try {
|
| | | ClientTransaction transaction = streamSession.get(streamId);
|
| | | ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId);
|
| | | // 服务重启后
|
| | | if (transaction == null) {
|
| | | StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
|
| | | StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
|
| | | if (streamInfo != null) {
|
| | |
|
| | | }
|
| | |
| | | }
|
| | |
|
| | | dialog.sendRequest(clientTransaction);
|
| | |
|
| | | streamSession.remove(streamId);
|
| | | zlmrtpServerFactory.closeRTPServer(streamId);
|
| | | } catch (TransactionDoesNotExistException e) {
|
| | | e.printStackTrace();
|
| | | } catch (SipException e) {
|
| | | e.printStackTrace();
|
| | | } catch (ParseException e) {
|
| | | zlmrtpServerFactory.closeRTPServer(streamSession.getStreamId(deviceId, channelId));
|
| | | streamSession.remove(deviceId, channelId);
|
| | | } catch (SipException | ParseException e) {
|
| | | e.printStackTrace();
|
| | | }
|
| | | }
|
| | |
| | | * 语音广播
|
| | | *
|
| | | * @param device 视频设备
|
| | | * @param channelId 预览通道
|
| | | */
|
| | | @Override
|
| | | public boolean audioBroadcastCmd(Device device) {
|
| | |
| | | * @param device 视频设备
|
| | | * @param startPriority 报警起始级别(可选)
|
| | | * @param endPriority 报警终止级别(可选)
|
| | | * @param alarmMethods 报警方式条件(可选)
|
| | | * @param alarmMethod 报警方式条件(可选)
|
| | | * @param alarmType 报警类型
|
| | | * @param startTime 报警发生起始时间(可选)
|
| | | * @param endTime 报警发生终止时间(可选)
|
| | |
| | | String streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
|
| | | zlmrtpServerFactory.closeRTPServer(streamId);
|
| | | }
|
| | | streamSession.remove(device.getDeviceId(), channelId);
|
| | | }
|
| | | }
|
| | |
| | | redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
|
| | | if (zlmrtpServerFactory.totalReaderCount(sendRtpItem.getApp(), streamId) == 0) {
|
| | | System.out.println(streamId + "无其它观看者,通知设备停止推流");
|
| | | cmder.streamByeCmd(streamId);
|
| | | cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
|
| | | }
|
| | | }
|
| | | } catch (SipException e) {
|
| | |
| | | StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*");
|
| | | if (streamInfo != null) {
|
| | | redisCatchStorage.stopPlayback(streamInfo);
|
| | | cmder.streamByeCmd(streamInfo.getStreamId());
|
| | | cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
|
| | | }
|
| | | }
|
| | | } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
|
| | |
| | | if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
|
| | | ret.put("close", false);
|
| | | } else {
|
| | | cmder.streamByeCmd(streamId);
|
| | | cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
|
| | | redisCatchStorage.stopPlay(streamInfo);
|
| | | storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
|
| | | }
|
| | | }else{
|
| | | cmder.streamByeCmd(streamId);
|
| | | cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
|
| | | streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
|
| | | redisCatchStorage.stopPlayback(streamInfo);
|
| | | }
|
| | |
| | | playResult.setResult(result); |
| | | // 录像查询以channelId作为deviceId查询 |
| | | resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result); |
| | | |
| | | // 超时处理 |
| | | result.onTimeout(()->{ |
| | | logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); |
| | | // 释放rtpserver |
| | | cmder.closeRTPServer(playResult.getDevice(), channelId); |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid()); |
| | | msg.setData("Timeout"); |
| | | resultHolder.invokeResult(msg); |
| | | }); |
| | | if (streamInfo == null) { |
| | | // 发送点播消息 |
| | | cmder.playStreamCmd(device, channelId, (JSONObject response) -> { |
| | |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); |
| | | Response response = event.getResponse(); |
| | | cmder.closeRTPServer(playResult.getDevice(), channelId); |
| | | msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); |
| | | resultHolder.invokeResult(msg); |
| | | if (errorEvent != null) { |
| | |
| | | logger.info("收到订阅消息: " + response.toJSONString()); |
| | | onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); |
| | | }, event -> { |
| | | cmder.closeRTPServer(playResult.getDevice(), channelId); |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); |
| | | Response response = event.getResponse(); |
| | |
| | | |
| | | StreamInfo queryPlaybackByStreamId(String steamId); |
| | | |
| | | StreamInfo queryPlayByDevice(String deviceId, String code); |
| | | StreamInfo queryPlayByDevice(String deviceId, String channelId); |
| | | |
| | | /** |
| | | * 更新流媒体信息 |
| | |
| | | } |
| | | |
| | | @Override |
| | | public StreamInfo queryPlayByDevice(String deviceId, String code) { |
| | | public StreamInfo queryPlayByDevice(String deviceId, String channelId) { |
| | | // List<Object> playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, |
| | | List<Object> playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, |
| | | deviceId, |
| | | code)); |
| | | channelId)); |
| | | if (playLeys == null || playLeys.size() == 0) return null; |
| | | return (StreamInfo)redis.get(playLeys.get(0).toString()); |
| | | } |
| | |
| | | |
| | | PlayResult playResult = playService.play(deviceId, channelId, null, null); |
| | | |
| | | // 超时处理 |
| | | playResult.getResult().onTimeout(()->{ |
| | | logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); |
| | | // 释放rtpserver |
| | | cmder.closeRTPServer(playResult.getDevice(), channelId); |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid()); |
| | | msg.setData("Timeout"); |
| | | resultHolder.invokeResult(msg); |
| | | }); |
| | | |
| | | return playResult.getResult(); |
| | | } |
| | | |
| | | @ApiOperation("停止点播") |
| | | @ApiImplicitParams({ |
| | | @ApiImplicitParam(name = "streamId", value = "视频流ID", dataTypeClass = String.class), |
| | | @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class), |
| | | @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class), |
| | | }) |
| | | @GetMapping("/stop/{streamId}") |
| | | public DeferredResult<ResponseEntity<String>> playStop(@PathVariable String streamId) { |
| | | @GetMapping("/stop/{deviceId}/{channelId}") |
| | | public DeferredResult<ResponseEntity<String>> playStop(@PathVariable String deviceId, @PathVariable String channelId) { |
| | | |
| | | logger.debug(String.format("设备预览/回放停止API调用,streamId:%s", streamId)); |
| | | logger.debug(String.format("设备预览/回放停止API调用,streamId:%s/$s", deviceId, channelId )); |
| | | |
| | | UUID uuid = UUID.randomUUID(); |
| | | DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(); |
| | |
| | | // 录像查询以channelId作为deviceId查询 |
| | | resultHolder.put(DeferredResultHolder.CALLBACK_CMD_STOP + uuid, result); |
| | | |
| | | cmder.streamByeCmd(streamId, event -> { |
| | | StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); |
| | | cmder.streamByeCmd(deviceId, channelId, event -> { |
| | | StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); |
| | | if (streamInfo == null) { |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); |
| | |
| | | } |
| | | }); |
| | | |
| | | if (streamId != null) { |
| | | if (deviceId != null || channelId != null) { |
| | | JSONObject json = new JSONObject(); |
| | | json.put("streamId", streamId); |
| | | json.put("deviceId", deviceId); |
| | | json.put("channelId", channelId); |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); |
| | | msg.setData(json.toString()); |
| | |
| | | |
| | | // 超时处理 |
| | | result.onTimeout(()->{ |
| | | logger.warn(String.format("设备预览/回放停止超时,streamId:%s ", streamId)); |
| | | logger.warn(String.format("设备预览/回放停止超时,deviceId/channelId:%s/$s ", deviceId, channelId)); |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setId(DeferredResultHolder.CALLBACK_CMD_STOP + uuid); |
| | | msg.setData("Timeout"); |
| | |
| | | StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); |
| | | if (streamInfo != null) { |
| | | // 停止之前的回放 |
| | | cmder.streamByeCmd(streamInfo.getStreamId()); |
| | | cmder.streamByeCmd(deviceId, channelId); |
| | | } |
| | | resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result); |
| | | cmder.playbackStreamCmd(device, channelId, startTime, endTime, (JSONObject response) -> { |
| | |
| | | |
| | | @ApiOperation("停止视频回放") |
| | | @ApiImplicitParams({ |
| | | @ApiImplicitParam(name = "ssrc", value = "视频流标识", dataTypeClass = String.class), |
| | | @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class), |
| | | @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class), |
| | | }) |
| | | @GetMapping("/stop/{ssrc}") |
| | | public ResponseEntity<String> playStop(@PathVariable String ssrc) { |
| | | @GetMapping("/stop/{deviceId}/{channelId}") |
| | | public ResponseEntity<String> playStop(@PathVariable String deviceId, @PathVariable String channelId) { |
| | | |
| | | cmder.streamByeCmd(ssrc); |
| | | cmder.streamByeCmd(deviceId, channelId); |
| | | |
| | | if (logger.isDebugEnabled()) { |
| | | logger.debug(String.format("设备录像回放停止 API调用,ssrc:%s", ssrc)); |
| | | logger.debug(String.format("设备录像回放停止 API调用,deviceId/channelId:%s/%s", deviceId, channelId)); |
| | | } |
| | | |
| | | if (ssrc != null) { |
| | | if (deviceId != null && channelId != null) { |
| | | JSONObject json = new JSONObject(); |
| | | json.put("ssrc", ssrc); |
| | | json.put("deviceId", deviceId); |
| | | json.put("channelId", channelId); |
| | | return new ResponseEntity<String>(json.toString(), HttpStatus.OK); |
| | | } else { |
| | | logger.warn("设备录像回放停止API调用失败!"); |
| | |
| | | result.put("error","未找到流信息"); |
| | | return result; |
| | | } |
| | | cmder.streamByeCmd(streamInfo.getStreamId()); |
| | | cmder.streamByeCmd(serial, code); |
| | | redisCatchStorage.stopPlay(streamInfo); |
| | | storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); |
| | | return null; |
| | |
| | | spring: |
| | | profiles: |
| | | active: dev |
| | | active: local |
| | |
| | | var that = this; |
| | | this.$axios({ |
| | | method: 'get', |
| | | url: '/api/play/stop/' + itemData.streamId |
| | | url: '/api/play/stop/' + this.deviceId + "/" + itemData.channelId |
| | | }).then(function (res) { |
| | | console.log(JSON.stringify(res)); |
| | | that.initData(); |
| | | }).catch(function (error) { |
| | | if (error.response.status == 402) { // 已经停止过 |
| | | if (error.response.status === 402) { // 已经停止过 |
| | | that.initData(); |
| | | }else { |
| | | console.log(error) |
| | |
| | | this.videoUrl = ''; |
| | | this.$axios({ |
| | | method: 'get', |
| | | url: '/api/playback/stop/' + this.streamId |
| | | url: '/api/playback/stop/' + this.deviceId + "/" + this.channelId |
| | | }).then(function (res) { |
| | | if (callback) callback() |
| | | }); |