From e94b99d11c46246532edc93cd25cbf8c0b88f03f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期日, 27 二月 2022 20:01:31 +0800 Subject: [PATCH] 实现国标录像级联播放,优化点播流程,加快点播速度 --- src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 26 ++++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 16 +- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | 9 + src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 5 src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java | 1 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java | 2 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java | 1 src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java | 8 src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 84 ++++++++----- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 88 ++++++++++++-- src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java | 7 - src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java | 3 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java | 30 ++-- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 2 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 1 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 19 ++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 3 src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java | 1 19 files changed, 208 insertions(+), 100 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 2c9c494..3e5d222 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -71,6 +71,16 @@ */ private String mediaServerId; + /** + * invite鐨刢allId + */ + private String CallId; + + /** + * 鏄惁鏄痯lay锛� false鏄痯layback + */ + private boolean isPlay; + public String getIp() { return ip; } @@ -174,4 +184,20 @@ public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } + + public String getCallId() { + return CallId; + } + + public void setCallId(String callId) { + CallId = callId; + } + + public boolean isPlay() { + return isPlay; + } + + public void setPlay(boolean play) { + isPlay = play; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java index e96e6a5..ac54c2d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java @@ -81,7 +81,6 @@ isUsed.remove(sn); notUsed.add(sn); }catch (NullPointerException e){ - System.out.printf("11111"); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java index 0d56bd5..f0d9033 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -36,7 +36,6 @@ SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key); if (subscribe != null) { - System.out.println("鍙戦�丟PS娑堟伅"); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); if (parentPlatform == null || parentPlatform.isStatus()) { // TODO 鏆傛椂鍙鐞嗚棰戞祦鐨勫洖澶�,鍚庣画澧炲姞瀵瑰浗鏍囪澶囩殑鏀寔 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 71025c0..d352bb2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -141,7 +141,6 @@ */ @Override public void processTimeout(TimeoutEvent timeoutEvent) { - System.out.println("processTimeout"); if(timeoutProcessor != null) { timeoutProcessor.process(timeoutEvent); } @@ -173,7 +172,6 @@ @Override public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { - System.out.println("processDialogTerminated"); CallIdHeader callId = dialogTerminatedEvent.getDialog().getCallId(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index b0593bd..da664dd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -346,8 +346,11 @@ subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; - event.response(mediaServerItemInUse, json); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + if (event != null) { + event.response(mediaServerItemInUse, json); + } + +// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); }); // StringBuffer content = new StringBuffer(200); @@ -452,9 +455,11 @@ logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + System.out.println(344444); if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; - event.response(mediaServerItemInUse, json); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + if (event != null) { + event.response(mediaServerItemInUse, json); + } }); StringBuffer content = new StringBuffer(200); @@ -465,8 +470,6 @@ content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" " +DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n"); - - String streamMode = device.getStreamMode().toUpperCase(); @@ -1202,7 +1205,6 @@ if (type == null) { type = "all"; } - try { StringBuffer recordInfoXml = new StringBuffer(200); recordInfoXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index bd41ddb..637381f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -508,9 +508,6 @@ // callid CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - System.out.println( - recordXml.toString() - ); Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, recordXml.toString(), fromTag, callIdHeader); transmitRequest(parentPlatform, request); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 127ef29..1e99c0b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -1,10 +1,13 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -24,6 +27,8 @@ import javax.sip.header.ToHeader; import java.util.HashMap; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; /** * SIP鍛戒护绫诲瀷锛� ACK璇锋眰 @@ -52,6 +57,9 @@ @Autowired private IMediaServerService mediaServerService; + @Autowired + private ZLMHttpHookSubscribe subscribe; + /** * 澶勭悊 ACK璇锋眰 @@ -60,6 +68,7 @@ */ @Override public void process(RequestEvent evt) { + logger.debug("ACK璇锋眰锛� {}", ((System.currentTimeMillis()))); Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { @@ -69,16 +78,17 @@ String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = null; - if (deviceId == null) { + if (sendRtpItem.isPlay()) { + streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + }else { + streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); + } + System.out.println(JSON.toJSON(streamInfo)); + if (streamInfo == null) { streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStreamId(sendRtpItem.getStreamId()); - }else { - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - sendRtpItem.setStreamId(streamInfo.getStreamId()); - streamInfo.setApp("rtp"); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); logger.info(platformGbId); logger.info(channelId); @@ -90,34 +100,42 @@ param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); - //param.put ("src_port", sendRtpItem.getLocalPort()); // 璁惧鎺ㄦ祦鏌ヨ锛屾垚鍔熷悗鎵嶈兘杞帹 - boolean rtpPushed = false; - long startTime = System.currentTimeMillis(); - while (!rtpPushed) { - try { - if (System.currentTimeMillis() - startTime < 30 * 1000) { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { - rtpPushed = true; - logger.info("宸茶幏鍙栬澶囨帹娴乕{}/{}]锛屽紑濮嬪悜涓婄骇鎺ㄦ祦[{}:{}]", - streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } else { - logger.info("绛夊緟璁惧鎺ㄦ祦[{}/{}].......", - streamInfo.getApp() ,streamInfo.getStreamId()); - Thread.sleep(1000); - continue; - } - } else { - rtpPushed = true; - logger.info("璁惧鎺ㄦ祦[{}/{}]瓒呮椂锛岀粓姝㈠悜涓婄骇鎺ㄦ祦", - streamInfo.getApp() ,streamInfo.getStreamId()); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { +// logger.info("宸茶幏鍙栬澶囨帹娴乕{}/{}]锛屽紑濮嬪悜涓婄骇鎺ㄦ祦[{}:{}]", +// streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// } else { +// // 瀵筯ook杩涜璁㈤槄 +// logger.info("绛夊緟璁惧鎺ㄦ祦[{}/{}].......", +// streamInfo.getApp(), streamInfo.getStreamId()); +// Timer timer = new Timer(); +// timer.schedule(new TimerTask() { +// @Override +// public void run() { +// logger.info("璁惧鎺ㄦ祦[{}/{}]瓒呮椂锛岀粓姝㈠悜涓婄骇鎺ㄦ祦", +// finalStreamInfo.getApp() , finalStreamInfo.getStreamId()); +// +// } +// }, 30*1000L); +// // 娣诲姞璁㈤槄 +// JSONObject subscribeKey = new JSONObject(); +// subscribeKey.put("app", "rtp"); +// subscribeKey.put("stream", streamInfo.getStreamId()); +// subscribeKey.put("mediaServerId", streamInfo.getMediaServerId()); +// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, +// (MediaServerItem mediaServerItemInUse, JSONObject json) -> { +// logger.info("宸茶幏鍙栬澶囨帹娴乕{}/{}]锛屽紑濮嬪悜涓婄骇鎺ㄦ祦[{}:{}]", +// finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); +// timer.cancel(); +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); +// }); +// } + + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index feb44c5..9b6e727 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -87,18 +87,29 @@ MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); - if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) { + int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); + if (totalReaderCount == 0) { logger.info(streamId + "鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦"); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); + }else if (totalReaderCount == -1){ + logger.warn(streamId + " 鏌ユ壘鍏跺畠瑙傜湅鑰呭け璐�"); } } // 鍙兘鏄澶囦富鍔ㄥ仠姝� Device device = storager.queryVideoDeviceByChannelId(platformGbId); if (device != null) { - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); + if (sendRtpItem.isPlay()) { + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); + } + }else { + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlayback(streamInfo); + } } + storager.stopPlay(device.getDeviceId(), channelId); mediaServerService.closeRTPServer(device, channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index ae2819c..a157a5c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,18 +1,28 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; +import gov.nist.javax.sdp.TimeDescriptionImpl; +import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import org.slf4j.Logger; @@ -27,10 +37,13 @@ import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; import java.util.Vector; @@ -61,6 +74,9 @@ private IPlayService playService; @Autowired + private ISIPCommander commander; + + @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired @@ -68,6 +84,7 @@ @Autowired private SIPProcessorObserver sipProcessorObserver; + @Override public void afterPropertiesSet() throws Exception { @@ -84,6 +101,7 @@ @Override public void process(RequestEvent evt) { // Invite Request娑堟伅瀹炵幇锛屾娑堟伅涓�鑸负绾ц仈娑堟伅锛屼笂绾х粰涓嬬骇鍙戦�佽姹傝棰戞寚浠� + Long startTimeForInvite = System.currentTimeMillis(); try { Request request = evt.getRequest(); SipURI sipURI = (SipURI) request.getRequestURI(); @@ -91,6 +109,7 @@ String requesterId = null; FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); + CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); requesterId = uri.getUser(); @@ -101,7 +120,7 @@ return; } - // 鏌ヨ璇锋眰鏂规槸鍚︿笂绾у钩鍙� + // 鏌ヨ璇锋眰鏄惁鏉ヨ嚜涓婄骇骞冲彴\璁惧 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform != null) { // 鏌ヨ骞冲彴涓嬫槸鍚︽湁璇ラ�氶亾 @@ -158,7 +177,21 @@ ssrc = ssrcDefault; sdp = SdpFactory.getInstance().createSessionDescription(contentString); } + String sessionName = sdp.getSessionName().getValue(); + Long startTime = null; + Long stopTime = null; + Date start = null; + Date end = null; + if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0)); + TimeField startTimeFiled = (TimeField)timeDescription.getTime(); + startTime = startTimeFiled.getStartTime(); + stopTime = startTimeFiled.getStopTime(); + + start = new Date(startTime*1000); + end = new Date(stopTime*1000); + } // 鑾峰彇鏀寔鐨勬牸寮� Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 鏌ョ湅鏄惁鏀寔PS 璐熻浇96 @@ -228,23 +261,31 @@ responseAck(evt, Response.BUSY_HERE); return; } - + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setPlay("Play".equals(sessionName)); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); - // 閫氱煡涓嬬骇鎺ㄦ祦锛� - PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{ - // 鏀跺埌鎺ㄦ祦锛� 鍥炲200OK, 绛夊緟ack + + Device finalDevice = device; + MediaServerItem finalMediaServerItem = mediaServerItem; + Long finalStartTime = startTime; + Long finalStopTime = stopTime; + ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ + logger.info("[涓婄骇鐐规挱]鏀跺埌涓嬬骇寮�濮嬬偣鎾闃咃紝 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); // if (sendRtpItem == null) return; sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); - // TODO 娣诲姞瀵箃cp鐨勬敮鎸� StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - content.append("s=Play\r\n"); + content.append("s=" + sessionName+"\r\n"); content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - content.append("t=0 0\r\n"); + if ("Playback".equals(sessionName)) { + content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); + }else { + content.append("t=0 0\r\n"); + } content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); @@ -260,7 +301,11 @@ } catch (ParseException e) { e.printStackTrace(); } - } ,((event) -> { + if ("Playback".equals(sessionName) && responseJSON != null) { + playService.onPublishHandlerForPlayBack(finalMediaServerItem, responseJSON, finalDevice.getDeviceId(), channelId, null); + } + }; + SipSubscribe.Event errorEvent = ((event) -> { // 鏈煡閿欒銆傜洿鎺ヨ浆鍙戣澶囩偣鎾殑閿欒 Response response = null; try { @@ -271,11 +316,27 @@ } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } - })); - if (logger.isDebugEnabled()) { - logger.debug(playResult.getResult().toString()); + }); + if ("Playback".equals(sessionName)) { + sendRtpItem.setPlay(false); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, sendRtpItem.getSsrc(), true); + sendRtpItem.setStreamId(ssrc); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + commander.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, format.format(start), format.format(end), hookEvent, errorEvent); + }else { + sendRtpItem.setPlay(true); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (streamInfo == null) { + if (mediaServerItem.isRtpEnable()) { + sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); + } + sendRtpItem.setPlay(false); + playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); + }else { + sendRtpItem.setStreamId(streamInfo.getStreamId()); + hookEvent.response(mediaServerItem, null); + } } - }else if (gbStream != null) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, @@ -295,7 +356,6 @@ sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); - // TODO 娣诲姞瀵箃cp鐨勬敮鎸� StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index 5446a90..1b5081b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -82,9 +82,6 @@ requestURI.setPort(event.getRemotePort()); reqAck.setRequestURI(requestURI); logger.info("鍚� " + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "鍥炲ack"); - SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI(); - String deviceId = requestURI.getUser(); - String channelId = sipURI.getUser(); dialog.sendAck(reqAck); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 4f45813..e2c83ea 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -181,7 +181,7 @@ @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8") public ResponseEntity<String> onPublish(@RequestBody JSONObject json) { - logger.debug("[ ZLM HOOK ]on_publish API璋冪敤锛屽弬鏁帮細" + json.toString()); + logger.info("[ ZLM HOOK ]on_publish API璋冪敤锛屽弬鏁帮細" + json.toString()); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java index c8cca53..84b36e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java @@ -77,21 +77,23 @@ if (eventMap == null) { return; } - Iterator<Map.Entry<JSONObject, Event>> iterator = eventMap.entrySet().iterator(); - while (iterator.hasNext()){ - Map.Entry<JSONObject, Event> next = iterator.next(); - JSONObject key = next.getKey(); - Boolean result = null; - for (String s : key.keySet()) { - if (result == null) { - result = key.getString(s).equals(hookResponse.getString(s)); - }else { - if (key.getString(s) == null) continue; - result = result && key.getString(s).equals(hookResponse.getString(s)); + + Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet(); + if (entries.size() > 0) { + for (Map.Entry<JSONObject, Event> entry : entries) { + JSONObject key = entry.getKey(); + Boolean result = null; + for (String s : key.keySet()) { + if (result == null) { + result = key.getString(s).equals(hookResponse.getString(s)); + }else { + if (key.getString(s) == null) continue; + result = result && key.getString(s).equals(hookResponse.getString(s)); + } } - } - if (null != result && result){ - iterator.remove(); + if (null != result && result){ + entries.remove(entry); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 05ecd3f..d0b1cb2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -72,7 +72,6 @@ ResponseBody responseBody = response.body(); if (responseBody != null) { String responseStr = responseBody.string(); - System.out.println(responseStr); responseJSON = JSON.parseObject(responseStr); } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 30a1509..76bab9c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -242,9 +242,18 @@ */ public int totalReaderCount(MediaServerItem mediaServerItem, String app, String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId); + Integer code = mediaInfo.getInteger("code"); if (mediaInfo == null) { return 0; } + if ( code < 0) { + logger.warn("鏌ヨ娴�({}/{})鏄惁鏈夊叾瀹冭鐪嬭�呮椂寰楀埌锛� {}", app, streamId, mediaInfo.getString("msg")); + return -1; + } + if ( code == 0 && ! mediaInfo.getBoolean("online")) { + logger.warn("鏌ヨ娴�({}/{})鏄惁鏈夊叾瀹冭鐪嬭�呮椂寰楀埌锛� {}", app, streamId, mediaInfo.getString("msg")); + return -1; + } return mediaInfo.getInteger("totalReaderCount"); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index cf30a79..d3f6976 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -122,7 +122,6 @@ // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙� try { String classPath = ResourceUtils.getURL("classpath:").getPath(); - // System.out.println(classPath); // 鍏煎鎵撳寘涓簀ar鐨刢lass璺緞 if(classPath.contains("jar")) { classPath = classPath.substring(0, classPath.lastIndexOf(".")); @@ -238,11 +237,11 @@ } @Override - public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) { + public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index d10dde5..f74b6d4 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -88,7 +88,7 @@ "</script>"}) int setDefaultCatalog(String platformId, String catalogId); - @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc where pgc.platformId=#{platformId} and pgc.channelId =#{gbId} " + + @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId where pgc.platformId=#{platformId} and dc.channelId =#{gbId} " + "union " + "select 'stream' as name, count(pgs.platformId) count from platform_gb_stream pgs left join gb_stream gs on pgs.gbStreamId = gs.gbStreamId where pgs.platformId=#{platformId} and gs.gbId = #{gbId}") List<ChannelSourceInfo> getChannelSource(String platformId, String gbId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java index 3df38fc..0abff27 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java @@ -52,8 +52,8 @@ int cleanChannelForGB(String platformId); - @Select("SELECT * FROM device_channel WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE " + - "platformId='${platformId}' AND channelId='${channelId}' ) AND channelId='${channelId}'") + @Select("SELECT dc.* FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE " + + "pgc.platformId=#{platformId} AND dc.channelId=#{channelId}") DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); @@ -62,7 +62,7 @@ "where pgc.platformId=#{platformId} and pgc.catalogId=#{catalogId}") List<PlatformCatalog> queryChannelInParentPlatformAndCatalog(String platformId, String catalogId); - @Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE platformId='${platformId}' AND channelId='${channelId}')") + @Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE pgc.platformId='${platformId}' AND dc.channelId='${channelId}')") Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); @Delete("<script> "+ @@ -71,7 +71,7 @@ int delByCatalogId(String id); @Delete("<script> "+ - "DELETE FROM platform_gb_channel WHERE catalogId=#{parentId} AND platformId=#{platformId} AND channelId=#{id}" + + "DELETE FROM platform_gb_channel WHERE catalogId=#{parentId} AND platformId=#{platformId} AND channelId=#{id}" + "</script>") int delByCatalogIdAndChannelIdAndPlatformId(PlatformCatalog platformCatalog); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 92fdf6c..1baefbe 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -139,7 +139,6 @@ @Override public StreamInfo queryPlayByDevice(String deviceId, String channelId) { -// List<Object> playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, userSetup.getServerId(), deviceId, diff --git a/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java b/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java index 3cb9aa5..23b9f6b 100644 --- a/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java +++ b/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java @@ -50,14 +50,7 @@ // System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, "1", null, // null, null).getSize()); - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - "2021-01-01 00:00:00", null).getSize()); - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - null, "2021-04-01 09:00:00").getSize()); - - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - "2021-02-01 01:00:00", "2021-04-01 04:00:00").getSize()); } -- Gitblit v1.8.0