From c21d973977a9f1d00d26179de764687ddd0ec56c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 24 四月 2024 14:59:41 +0800 Subject: [PATCH] 修复收到catalog消息是更新导致是否有音频的设置失效的BUG --- src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 578 ++++++++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 444 insertions(+), 134 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java old mode 100644 new mode 100755 index a1f3d55..06c621e --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,51 +1,69 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.InviteSessionStatus; +import com.genersoft.iot.vmp.common.InviteSessionType; +import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; +import com.genersoft.iot.vmp.storager.dao.*; +import com.genersoft.iot.vmp.utils.DateUtil; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.sdp.*; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; +import javax.sip.PeerUnavailableException; import javax.sip.SipException; import java.text.ParseException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.*; /** * @author lin */ @Service +@DS("master") public class PlatformServiceImpl implements IPlatformService { private final static String REGISTER_KEY_PREFIX = "platform_register_"; + + private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_"; private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_"; private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class); @@ -57,6 +75,9 @@ private IRedisCatchStorage redisCatchStorage; @Autowired + private SSRCFactory ssrcFactory; + + @Autowired private IMediaServerService mediaServerService; @Autowired @@ -66,7 +87,7 @@ private DynamicTask dynamicTask; @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + private ZLMServerFactory zlmServerFactory; @Autowired private SubscribeHolder subscribeHolder; @@ -87,6 +108,11 @@ @Autowired private IPlayService playService; + @Autowired + private IInviteStreamService inviteStreamService; + + @Autowired + private ZLMRESTfulUtils zlmresTfulUtils; @Override @@ -135,98 +161,183 @@ } @Override - public void online(ParentPlatform parentPlatform) { - logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎/鏇存柊娉ㄥ唽", parentPlatform.getServerGBId()); + public boolean update(ParentPlatform parentPlatform) { + logger.info("[鍥芥爣绾ц仈]鏇存柊骞冲彴 {}", parentPlatform.getDeviceGBId()); + parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase()); + ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId()); + ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId()); + parentPlatform.setUpdateTime(DateUtil.getNow()); + + // 鍋滄蹇冭烦瀹氭椂 + final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId(); + dynamicTask.stop(keepaliveTaskKey); + // 鍋滄娉ㄥ唽瀹氭椂 + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatformOld.getServerGBId(); + dynamicTask.stop(registerTaskKey); + // 娉ㄩ攢鏃х殑 + try { + if (parentPlatformOld.isStatus() && parentPlatformCatchOld != null) { + logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版棫骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatformOld.getServerGBId()); + commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> { + logger.info("[鍥芥爣绾ц仈] 娉ㄩ攢鎴愬姛锛� 骞冲彴锛歿}", parentPlatformOld.getServerGBId()); + }); + } + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄩ攢: {}", e.getMessage()); + } + + // 鏇存柊鏁版嵁搴� + if (parentPlatform.getCatalogGroup() == 0) { + parentPlatform.setCatalogGroup(1); + } + if (parentPlatform.getAdministrativeDivision() == null) { + parentPlatform.setAdministrativeDivision(parentPlatform.getAdministrativeDivision()); + } + + platformMapper.updateParentPlatform(parentPlatform); + // 鏇存柊redis + redisCatchStorage.delPlatformCatchInfo(parentPlatformOld.getServerGBId()); + ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch(); + parentPlatformCatch.setParentPlatform(parentPlatform); + parentPlatformCatch.setId(parentPlatform.getServerGBId()); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + // 娉ㄥ唽 + if (parentPlatform.isEnable()) { + // 淇濆瓨鏃跺惎鐢ㄥ氨鍙戦�佹敞鍐� + // 娉ㄥ唽鎴愬姛鏃剁敱绋嬪簭鐩存帴璋冪敤浜唎nline鏂规硶 + try { + logger.info("[鍥芥爣绾ц仈] 骞冲彴娉ㄥ唽 {}", parentPlatform.getDeviceGBId()); + commanderForPlatform.register(parentPlatform, eventResult -> { + logger.info("[鍥芥爣绾ц仈] {},娣诲姞鍚戜笂绾ф敞鍐屽け璐ワ紝璇风‘瀹氫笂绾у钩鍙板彲鐢ㄦ椂閲嶆柊淇濆瓨", parentPlatform.getServerGBId()); + }, null); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈: {}", e.getMessage()); + } + } + + + return false; + } + + + @Override + public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) { + logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎", parentPlatform.getServerGBId()); + final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId(); + dynamicTask.stop(registerFailAgainTaskKey); + platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); - if (parentPlatformCatch != null) { - parentPlatformCatch.getParentPlatform().setStatus(true); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - }else { + if (parentPlatformCatch == null) { parentPlatformCatch = new ParentPlatformCatch(); parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setId(parentPlatform.getServerGBId()); parentPlatform.setStatus(true); parentPlatformCatch.setParentPlatform(parentPlatform); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } + + parentPlatformCatch.getParentPlatform().setStatus(true); + parentPlatformCatch.setSipTransactionInfo(sipTransactionInfo); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); if (!dynamicTask.isAlive(registerTaskKey)) { + logger.info("[鍥芥爣绾ц仈]锛歿}, 娣诲姞瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId()); // 娣诲姞娉ㄥ唽浠诲姟 dynamicTask.startCron(registerTaskKey, // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛� - ()-> { - registerTask(parentPlatform); - }, - (parentPlatform.getExpires() - 10) *1000); + ()-> registerTask(parentPlatform, sipTransactionInfo), + parentPlatform.getExpires() * 1000); } final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); if (!dynamicTask.contains(keepaliveTaskKey)) { + logger.info("[鍥芥爣绾ц仈]锛歿}, 娣诲姞瀹氭椂蹇冭烦浠诲姟", parentPlatform.getServerGBId()); // 娣诲姞蹇冭烦浠诲姟 dynamicTask.startCron(keepaliveTaskKey, ()-> { try { commanderForPlatform.keepalive(parentPlatform, eventResult -> { // 蹇冭烦澶辫触 - if (eventResult.type == SipSubscribe.EventResultType.timeout) { - // 蹇冭烦瓒呮椂 - ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); - // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎 - if (platformCatch.getKeepAliveReply() == 2) { - // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽 - logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId()); - try { - commanderForPlatform.register(parentPlatform, eventResult1 -> { - logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId()); - offline(parentPlatform, false); - }, null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage()); - } - } - - }else { + if (eventResult.type != SipSubscribe.EventResultType.timeout) { logger.warn("[鍥芥爣绾ц仈]鍙戦�佸績璺虫敹鍒伴敊璇紝code锛� {}, msg: {}", eventResult.statusCode, eventResult.msg); + } + // 蹇冭烦澶辫触 + ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎 + if (platformCatch.getKeepAliveReply() == 2) { + // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽 + logger.info("[鍥芥爣绾ц仈] 涓夋蹇冭烦澶辫触, 骞冲彴{}({})绂荤嚎", parentPlatform.getName(), parentPlatform.getServerGBId()); + offline(parentPlatform, false); + }else { + platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1); + redisCatchStorage.updatePlatformCatchInfo(platformCatch); } }, eventResult -> { // 蹇冭烦鎴愬姛 // 娓呯┖涔嬪墠鐨勫績璺宠秴鏃惰鏁� ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); - if (platformCatch.getKeepAliveReply() > 0) { + if (platformCatch != null && platformCatch.getKeepAliveReply() > 0) { platformCatch.setKeepAliveReply(0); redisCatchStorage.updatePlatformCatchInfo(platformCatch); } + logger.info("[鍙戦�佸績璺砞 鍥芥爣绾ц仈 鍙戦�佸績璺�, code锛� {}, msg: {}", eventResult.statusCode, eventResult.msg); }); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�佸績璺�: {}", e.getMessage()); } }, - (parentPlatform.getKeepTimeout() - 10)*1000); + (parentPlatform.getKeepTimeout())*1000); + } + if (parentPlatform.isAutoPushChannel()) { + if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) { + logger.info("[鍥芥爣绾ц仈]锛歿}, 娣诲姞鑷姩閫氶亾鎺ㄩ�佹ā鎷熻闃呬俊鎭�", parentPlatform.getServerGBId()); + addSimulatedSubscribeInfo(parentPlatform); + } + }else { + SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()); + if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) { + subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId()); + } } } - private void registerTask(ParentPlatform parentPlatform){ + @Override + public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) { + // 鑷姩娣诲姞涓�鏉℃ā鎷熺殑璁㈤槄淇℃伅 + SubscribeInfo subscribeInfo = new SubscribeInfo(); + subscribeInfo.setId(parentPlatform.getServerGBId()); + subscribeInfo.setExpires(-1); + subscribeInfo.setEventType("Catalog"); + int random = (int) Math.floor(Math.random() * 10000); + subscribeInfo.setEventId(random + ""); + subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP()); + subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", "")); + subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", "")); + subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo); + } + + private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){ try { - // 璁剧疆瓒呮椂閲嶅彂锛� 鍚庣画浠庡簳灞傛敮鎸佹秷鎭噸鍙� - String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout"; - if (dynamicTask.isAlive(key)) { - return; + // 涓嶅湪鍚屼竴涓細璇濅腑缁鍒欐瘡娆″叏鏂版敞鍐� + if (!userSetting.isRegisterKeepIntDialog()) { + sipTransactionInfo = null; } - dynamicTask.startDelay(key, ()->{ - registerTask(parentPlatform); - }, 1000); - logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛岄噸鏂版敞鍐�", parentPlatform.getServerGBId()); - commanderForPlatform.register(parentPlatform, eventResult -> { - dynamicTask.stop(key); + + if (sipTransactionInfo == null) { + logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬮噸鏂版敞鍐�", parentPlatform.getServerGBId()); + }else { + logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId()); + } + + commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> { + logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽澶辫触锛寋}:{}", parentPlatform.getServerGBId(), + eventResult.statusCode, eventResult.msg); offline(parentPlatform, false); - },eventResult -> { - dynamicTask.stop(key); - }); - } catch (InvalidArgumentException | ParseException | SipException e) { + }, null); + } catch (Exception e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage()); } } @@ -246,37 +357,55 @@ // 鍋滄鎵�鏈夋帹娴� logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId()); stopAllPush(parentPlatform.getServerGBId()); - if (stopRegister) { - // 娓呴櫎娉ㄥ唽瀹氭椂 - logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId()); - final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); - if (dynamicTask.contains(registerTaskKey)) { - dynamicTask.stop(registerTaskKey); - } + + // 娓呴櫎娉ㄥ唽瀹氭椂 + logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId()); + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); + if (dynamicTask.contains(registerTaskKey)) { + dynamicTask.stop(registerTaskKey); } // 娓呴櫎蹇冭烦瀹氭椂 logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId()); final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); if (dynamicTask.contains(keepaliveTaskKey)) { - // 娣诲姞蹇冭烦浠诲姟 + // 娓呴櫎蹇冭烦浠诲姟 dynamicTask.stop(keepaliveTaskKey); } - // 鍋滄鐩綍璁㈤槄鍥炲 - logger.info("[骞冲彴绂荤嚎] {}, 鍋滄璁㈤槄鍥炲", parentPlatform.getServerGBId()); - subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId()); + // 鍋滄璁㈤槄鍥炲 + SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()); + if (catalogSubscribe != null) { + if (catalogSubscribe.getExpires() > 0) { + logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鐩綍璁㈤槄鍥炲", parentPlatform.getServerGBId()); + subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId()); + } + } + logger.info("[骞冲彴绂荤嚎] {}, 鍋滄绉诲姩浣嶇疆璁㈤槄鍥炲", parentPlatform.getServerGBId()); + subscribeHolder.removeMobilePositionSubscribe(parentPlatform.getServerGBId()); + // 鍙戣捣瀹氭椂鑷姩閲嶆柊娉ㄥ唽 + if (!stopRegister) { + // 璁剧疆涓�60绉掕嚜鍔ㄥ皾璇曢噸鏂版敞鍐� + final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId(); + ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId()); + if (platform.isEnable()) { + dynamicTask.startCron(registerFailAgainTaskKey, + ()-> registerTask(platform, null), + userSetting.getRegisterAgainAfterTime() * 1000); + } + } } private void stopAllPush(String platformId) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId); if (sendRtpItems != null && sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Map<String, Object> param = new HashMap<>(3); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); - zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + param.put("stream", sendRtpItem.getStream()); + zlmServerFactory.stopSendRtpStream(mediaInfo, param); } } } @@ -341,21 +470,23 @@ logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽鏈壘鍒板彲鐢ㄧ殑zlm. platform: {}", platform.getServerGBId()); return; } - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId); - if (streamInfo != null) { + InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId); + + if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) { // 濡傛灉zlm涓嶅瓨鍦ㄨ繖涓祦锛屽垯鍒犻櫎鏁版嵁鍗冲彲 - MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(streamInfo.getMediaServerId()); + MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServerId()); if (mediaServerItemForStreamInfo != null) { - Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, streamInfo.getApp(), streamInfo.getStream()); + Boolean ready = zlmServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfoForOld.getStreamInfo().getApp(), inviteInfoForOld.getStreamInfo().getStream()); if (!ready) { // 閿欒瀛樺湪浜巖edis涓殑鏁版嵁 - redisCatchStorage.stopPlay(streamInfo); + inviteStreamService.removeInviteInfo(inviteInfoForOld); }else { // 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉 - JSONObject json = new JSONObject(); - json.put("app", streamInfo.getApp()); - json.put("stream", streamInfo.getStream()); - hookEvent.response(mediaServerItemForStreamInfo, json); + OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam(); + hookParam.setApp(inviteInfoForOld.getStreamInfo().getApp()); + hookParam.setStream(inviteInfoForOld.getStreamInfo().getStream()); + + hookEvent.response(mediaServerItemForStreamInfo, hookParam); return; } } @@ -367,19 +498,37 @@ } // 榛樿涓嶈繘琛孲SRC鏍¢獙锛� TODO 鍚庣画鍙敼涓洪厤缃� boolean ssrcCheck = false; - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true); + int tcpMode; + if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-PASSIVE")) { + tcpMode = 1; + }else if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) { + tcpMode = 2; + } else { + tcpMode = 0; + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, tcpMode); if (ssrcInfo == null || ssrcInfo.getPort() < 0) { logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 寮�鍚鍙g洃鍚け璐ワ紝 platform: {}, channel锛� {}", platform.getServerGBId(), channelId); - errorEvent.response(new SipSubscribe.EventResult(-1, "绔彛鐩戝惉澶辫触")); + SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>(); + eventResult.statusCode = -1; + eventResult.msg = "绔彛鐩戝惉澶辫触"; + eventResult.type = SipSubscribe.EventResultType.failedToGetPort; + errorEvent.response(eventResult); return; } - logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 deviceId: {}, channelId: {},鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}", + logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽锛屽彂璧稩nvite娑堟伅 deviceId: {}, channelId: {},鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck); + // 鍒濆鍖杛edis涓殑invite娑堟伅鐘舵�� + InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channelId, ssrcInfo.getStream(), ssrcInfo, + mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST, + InviteSessionStatus.ready); + inviteStreamService.updateInviteInfo(inviteInfo); String timeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(timeOutTaskKey, () -> { // 鎵ц瓒呮椂浠诲姟鏃舵煡璇㈡槸鍚﹀凡缁忔垚鍔燂紝鎴愬姛浜嗗垯涓嶆墽琛岃秴鏃朵换鍔★紝闃叉瓒呮椂浠诲姟鍙栨秷澶辫触鐨勬儏鍐� - if (redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId) == null) { + InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null); + if (inviteInfoForBroadcast == null) { logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀舵祦瓒呮椂 deviceId: {}, channelId: {}锛岀鍙o細{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧� try { @@ -395,65 +544,58 @@ } } }, userSetting.getPlayTimeout()); - commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{ + commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{ + logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀跺埌涓婄骇鎺ㄦ祦 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); dynamicTask.stop(timeOutTaskKey); // hook鍝嶅簲 - playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId); + playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId); // 鏀跺埌娴� if (hookEvent != null) { - hookEvent.response(mediaServerItem, response); + hookEvent.response(mediaServerItem, hookParam); } }, event -> { - // 鏀跺埌200OK 妫�娴媠src鏄惁鏈夊彉鍖栵紝闃叉涓婄骇鑷畾涔変簡ssrc - ResponseEvent responseEvent = (ResponseEvent) event.event; - String contentString = new String(responseEvent.getResponse().getRawContent()); - // 鑾峰彇ssrc - int ssrcIndex = contentString.indexOf("y="); - // 妫�鏌ユ槸鍚︽湁y瀛楁 - if (ssrcIndex >= 0) { - //ssrc瑙勫畾闀垮害涓�10瀛楄妭锛屼笉鍙栦綑涓嬮暱搴︿互閬垮厤鍚庣画杩樻湁鈥渇=鈥濆瓧娈� TODO 鍚庣画瀵逛笉瑙勮寖鐨勯潪10浣峴src鍏煎 - String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - // 鏌ヨ鍒皊src涓嶄竴鑷翠笖寮�鍚簡ssrc鏍¢獙鍒欓渶瑕侀拡瀵瑰鐞� - if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) { - return; - } - logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse); - if (!mediaServerItem.isRtpEnable()) { - logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { - // ssrc 涓嶅彲鐢� - // 閲婃斁ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); - event.msg = "涓嬬骇鑷畾涔変簡ssrc,浣嗘槸姝src涓嶅彲鐢�"; - event.statusCode = 400; - errorEvent.response(event); - return; - } - - // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚� - if (!mediaServerItem.isRtpEnable()) { - // 娣诲姞璁㈤槄 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); - hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); - dynamicTask.stop(timeOutTaskKey); - // hook鍝嶅簲 - playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId); - hookEvent.response(mediaServerItemInUse, response); - }); - } - // 鍏抽棴rtp server - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 閲嶆柊寮�鍚痵src server - mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true); - - - } - } + inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channelId, timeOutTaskKey, + null, inviteInfo, InviteSessionType.BROADCAST); +// // 鏀跺埌200OK 妫�娴媠src鏄惁鏈夊彉鍖栵紝闃叉涓婄骇鑷畾涔変簡ssrc +// ResponseEvent responseEvent = (ResponseEvent) event.event; +// String contentString = new String(responseEvent.getResponse().getRawContent()); +// // 鑾峰彇ssrc +// int ssrcIndex = contentString.indexOf("y="); +// // 妫�鏌ユ槸鍚︽湁y瀛楁 +// if (ssrcIndex >= 0) { +// //ssrc瑙勫畾闀垮害涓�10瀛楄妭锛屼笉鍙栦綑涓嬮暱搴︿互閬垮厤鍚庣画杩樻湁鈥渇=鈥濆瓧娈� TODO 鍚庣画瀵逛笉瑙勮寖鐨勯潪10浣峴src鍏煎 +// String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); +// // 鏌ヨ鍒皊src涓嶄竴鑷翠笖寮�鍚簡ssrc鏍¢獙鍒欓渶瑕侀拡瀵瑰鐞� +// if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) { +// tcpActiveHandler(platform, ) +// return; +// } +// logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse); +// if (!mediaServerItem.isRtpEnable()) { +// logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); +// // 閲婃斁ssrc +// mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); +// // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚� +// if (!mediaServerItem.isRtpEnable()) { +// // 娣诲姞璁㈤槄 +// HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); +// subscribe.removeSubscribe(hookSubscribe); +// hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); +// subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { +// logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam); +// dynamicTask.stop(timeOutTaskKey); +// // hook鍝嶅簲 +// playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId); +// hookEvent.response(mediaServerItemInUse, hookParam); +// }); +// } +// // 鍏抽棴rtp server +// mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); +// // 閲嶆柊寮�鍚痵src server +// mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true, false, tcpMode); +// } +// } }, eventResult -> { // 鏀跺埌閿欒鍥炲 if (errorEvent != null) { @@ -462,8 +604,176 @@ }); } + private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServerItem mediaServerItem, + ParentPlatform platform, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback, + InviteInfo inviteInfo, InviteSessionType inviteSessionType){ + inviteInfo.setStatus(InviteSessionStatus.ok); + ResponseEvent responseEvent = (ResponseEvent) eventResult.event; + String contentString = new String(responseEvent.getResponse().getRawContent()); + System.out.println(1111); + System.out.println(contentString); + String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString); + // 鍏煎鍥炲鐨勬秷鎭腑缂哄皯ssrc(y瀛楁)鐨勬儏鍐� + if (ssrcInResponse == null) { + ssrcInResponse = ssrcInfo.getSsrc(); + } + if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { + // ssrc 涓�鑷� + if (mediaServerItem.isRtpEnable()) { + // 澶氱鍙� + if (tcpMode == 2) { + tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, + timeOutTaskKey, ssrcInfo, callback); + } + }else { + // 鍗曠鍙� + if (tcpMode == 2) { + logger.warn("[Invite 200OK] 鍗曠鍙f敹娴佹ā寮忎笉鏀寔tcp涓诲姩妯″紡鏀舵祦"); + } + } + }else { + logger.info("[Invite 200OK] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse); + // ssrc 涓嶄竴鑷� + if (mediaServerItem.isRtpEnable()) { + // 澶氱鍙� + if (ssrcCheck) { + // ssrc妫�楠� + // 鏇存柊ssrc + logger.info("[Invite 200OK] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); + if (!result) { + try { + logger.warn("[Invite 200OK] 鏇存柊ssrc澶辫触锛屽仠姝㈠枈璇� {}/{}", platform.getServerGBId(), channelId); + commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null); + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍋滄鎾斁锛� 鍙戦�丅YE: {}", e.getMessage()); + } + + dynamicTask.stop(timeOutTaskKey); + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + + callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + "涓嬬骇鑷畾涔変簡ssrc,閲嶆柊璁剧疆鏀舵祦淇℃伅澶辫触", null); + inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channelId, null, + InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + "涓嬬骇鑷畾涔変簡ssrc,閲嶆柊璁剧疆鏀舵祦淇℃伅澶辫触", null); + + }else { + ssrcInfo.setSsrc(ssrcInResponse); + inviteInfo.setSsrcInfo(ssrcInfo); + inviteInfo.setStream(ssrcInfo.getStream()); + if (tcpMode == 2) { + if (mediaServerItem.isRtpEnable()) { + tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, + timeOutTaskKey, ssrcInfo, callback); + }else { + logger.warn("[Invite 200OK] 鍗曠鍙f敹娴佹ā寮忎笉鏀寔tcp涓诲姩妯″紡鏀舵祦"); + } + } + inviteStreamService.updateInviteInfo(inviteInfo); + } + }else { + ssrcInfo.setSsrc(ssrcInResponse); + inviteInfo.setSsrcInfo(ssrcInfo); + inviteInfo.setStream(ssrcInfo.getStream()); + if (tcpMode == 2) { + if (mediaServerItem.isRtpEnable()) { + tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, + timeOutTaskKey, ssrcInfo, callback); + }else { + logger.warn("[Invite 200OK] 鍗曠鍙f敹娴佹ā寮忎笉鏀寔tcp涓诲姩妯″紡鏀舵祦"); + } + } + inviteStreamService.updateInviteInfo(inviteInfo); + } + }else { + if (ssrcInResponse != null) { + // 鍗曠鍙� + // 閲嶆柊璁㈤槄娴佷笂绾� + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(), + inviteInfo.getChannelId(), null, inviteInfo.getStream()); + streamSession.remove(inviteInfo.getDeviceId(), + inviteInfo.getChannelId(), inviteInfo.getStream()); + inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse); + streamSession.put(platform.getServerGBId(), channelId, ssrcTransaction.getCallId(), + inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType); + } + } + } + } + + + private void tcpActiveHandler(ParentPlatform platform, String channelId, String contentString, + MediaServerItem mediaServerItem, int tcpMode, boolean ssrcCheck, + String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){ + if (tcpMode != 2) { + return; + } + + String substring; + if (contentString.indexOf("y=") > 0) { + substring = contentString.substring(0, contentString.indexOf("y=")); + }else { + substring = contentString; + } + try { + SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); + int port = -1; + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + for (Object description : mediaDescriptions) { + MediaDescription mediaDescription = (MediaDescription) description; + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("8") || mediaFormats.contains("0")) { + port = media.getMediaPort(); + break; + } + } + logger.info("[TCP涓诲姩杩炴帴瀵规柟] serverGbId: {}, channelId: {}, 杩炴帴瀵规柟鐨勫湴鍧�锛歿}:{}, SSRC: {}, SSRC鏍¢獙锛歿}", + platform.getServerGBId(), channelId, sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck); + JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); + logger.info("[TCP涓诲姩杩炴帴瀵规柟] 缁撴灉锛� {}", jsonObject); + } catch (SdpException e) { + logger.error("[TCP涓诲姩杩炴帴瀵规柟] serverGbId: {}, channelId: {}, 瑙f瀽200OK鐨凷DP淇℃伅澶辫触", platform.getServerGBId(), channelId, e); + dynamicTask.stop(timeOutTaskKey); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + + callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); + inviteStreamService.call(InviteSessionType.PLAY, platform.getServerGBId(), channelId, null, + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); + } + } + @Override - public void stopBroadcast(ParentPlatform platform, String channelId, String stream) throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException { - commanderForPlatform.streamByeCmd(platform, channelId, stream, null, null); + public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServerItem mediaServerItem) { + + try { + if (sendBye) { + commanderForPlatform.streamByeCmd(platform, channel.getChannelId(), stream, null, null); + } + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.warn("[娑堟伅鍙戦�佸け璐 鍋滄璇煶瀵硅锛� 骞冲彴锛歿}锛岄�氶亾锛歿}", platform.getId(), channel.getChannelId() ); + } finally { + mediaServerService.closeRTPServer(mediaServerItem, stream); + InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getChannelId(), stream); + if (inviteInfo != null) { + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc()); + inviteStreamService.removeInviteInfo(inviteInfo); + } + streamSession.remove(platform.getServerGBId(), channel.getChannelId(), stream); + } } } -- Gitblit v1.8.0