From a9bdb0a706e10d2dffb50ae5a8086dd744bbd976 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 10 五月 2022 17:45:07 +0800
Subject: [PATCH] 优化语音广播流程

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java |   64 ++++++++++++---------
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java                               |   37 ++++++------
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java    |    8 +-
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java    |   15 ++++-
 4 files changed, 71 insertions(+), 53 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 6cc19a7..153a08a 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -4,9 +4,8 @@
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -82,6 +81,9 @@
 	@Autowired
 	private ISIPCommanderForPlatform commanderForPlatform;
 
+	@Autowired
+	private AudioBroadcastManager audioBroadcastManager;
+
 
 	/**   
 	 * 澶勭悊  ACK璇锋眰
@@ -122,6 +124,13 @@
 			if (jsonObject == null) {
 				logger.error("RTP鎺ㄦ祦澶辫触: 璇锋鏌LM鏈嶅姟");
 			} else if (jsonObject.getInteger("code") == 0) {
+				if (sendRtpItem.isOnlyAudio()) {
+					AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+					audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
+					audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog());
+					audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest());
+					audioBroadcastManager.update(audioBroadcastCatch);
+				}
 				logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
 			} else {
 				logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}",jsonObject.getString("msg"),JSONObject.toJSON(param));
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index c6116e3..7944787 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -91,7 +91,7 @@
 			if (dialog.getState().equals(DialogState.TERMINATED)) {
 				String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
 				String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
-				SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
+				SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId());
 				logger.info("鏀跺埌bye, [{}/{}]", platformGbId, channelId);
 				if (sendRtpItem != null){
 					String streamId = sendRtpItem.getStreamId();
@@ -103,15 +103,15 @@
 					logger.info("鏀跺埌bye:鍋滄鍚戜笂绾ф帹娴侊細" + streamId);
 					MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
 					zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
-					redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
+					redisCatchStorage.deleteSendRTPServer(platformGbId, sendRtpItem.getChannelId(), callIdHeader.getCallId(), null);
 					int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
 					if (totalReaderCount <= 0) {
 						logger.info("鏀跺埌bye: {} 鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦", streamId);
 						if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
-							cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null);
+							cmder.streamByeCmd(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId(), streamId, null);
 						}
 						if (sendRtpItem.isOnlyAudio()) {
-							playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), channelId);
+							playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
 						}
 						if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
 							MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index dc77afc..e3a9b27 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -713,6 +713,7 @@
 				String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId();
 				dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{
 					logger.info("绛夊緟鎺ㄦ祦瓒呮椂: {}/{}", app, stream);
+					subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
 					playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
 					// 鍙戦�乥ye
 					try {
@@ -728,35 +729,42 @@
 
 				subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
 						(MediaServerItem mediaServerItemInUse, JSONObject json)->{
-							sendRtpItem.setStatus(2);
-							dynamicTask.stop(waiteStreamTimeoutTaskKey);
-							redisCatchStorage.updateSendRTPSever(sendRtpItem);
-							StringBuffer content = new StringBuffer(200);
-							content.append("v=0\r\n");
-							content.append("o="+ audioBroadcastCatch.getChannelId() +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
-							content.append("s=Play\r\n");
-							content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
-							content.append("t=0 0\r\n");
-							content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
-							content.append("a=sendonly\r\n");
-							content.append("a=rtpmap:8 PCMA/8000\r\n");
-							content.append("y="+ finalSsrc + "\r\n");
-							content.append("f=v/////a/1/8/1\r\n");
+					logger.info("鏀跺埌璇煶瀵硅鎺ㄦ祦");
+					try {
+						sendRtpItem.setStatus(2);
+						redisCatchStorage.updateSendRTPSever(sendRtpItem);
+						StringBuffer content = new StringBuffer(200);
+						content.append("v=0\r\n");
+						content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion()  + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
+						content.append("s=Play\r\n");
+						content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
+						content.append("t=0 0\r\n");
+						content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
+						content.append("a=sendonly\r\n");
+						content.append("a=rtpmap:8 PCMA/8000\r\n");
+						content.append("y="+ finalSsrc + "\r\n");
+						content.append("f=v/////a/1/8/1\r\n");
 
-							ParentPlatform parentPlatform = new ParentPlatform();
-							parentPlatform.setServerIP(device.getIp());
-							parentPlatform.setServerPort(device.getPort());
-							parentPlatform.setServerGBId(device.getDeviceId());
-							try {
-								responseSdpAck(evt, content.toString(), parentPlatform);
-							} catch (SipException e) {
-								throw new RuntimeException(e);
-							} catch (InvalidArgumentException e) {
-								throw new RuntimeException(e);
-							} catch (ParseException e) {
-								throw new RuntimeException(e);
-							}
-						});
+						ParentPlatform parentPlatform = new ParentPlatform();
+						parentPlatform.setServerIP(device.getIp());
+						parentPlatform.setServerPort(device.getPort());
+						parentPlatform.setServerGBId(device.getDeviceId());
+
+						responseSdpAck(evt, content.toString(), parentPlatform);
+						Dialog dialog = evt.getDialog();
+						audioBroadcastCatch.setDialog((SIPDialog) dialog);
+						audioBroadcastCatch.setRequest((SIPRequest) request);
+						audioBroadcastManager.update(audioBroadcastCatch);
+					} catch (SipException e) {
+						throw new RuntimeException(e);
+					} catch (InvalidArgumentException e) {
+						throw new RuntimeException(e);
+					} catch (ParseException e) {
+						throw new RuntimeException(e);
+					} catch (SdpParseException e) {
+						throw new RuntimeException(e);
+					}
+				});
 			}
 			String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId();
 			WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 934745e..27aa7a9 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -675,26 +675,27 @@
         AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId);
         if (audioBroadcastCatch != null) {
             audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId());
-        }
-        try {
-            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(deviceId, channelId, null, null);
-            if (sendRtpItem != null) {
-                redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
-                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Map<String, Object> param = new HashMap<>();
-                param.put("vhost", "__defaultVhost__");
-                param.put("app", sendRtpItem.getApp());
-                param.put("stream", sendRtpItem.getStreamId());
-                zlmresTfulUtils.stopSendRtp(mediaInfo, param);
+            try {
+                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
+                if (sendRtpItem != null) {
+                    redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
+                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                    Map<String, Object> param = new HashMap<>();
+                    param.put("vhost", "__defaultVhost__");
+                    param.put("app", sendRtpItem.getApp());
+                    param.put("stream", sendRtpItem.getStreamId());
+                    zlmresTfulUtils.stopSendRtp(mediaInfo, param);
+                }
+                if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
+                    cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null);
+                }
+            } catch (SipException e) {
+                throw new RuntimeException(e);
+            } catch (ParseException e) {
+                throw new RuntimeException(e);
             }
-            if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
-                cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null);
-            }
-        } catch (SipException e) {
-            throw new RuntimeException(e);
-        } catch (ParseException e) {
-            throw new RuntimeException(e);
         }
 
+
     }
 }

--
Gitblit v1.8.0