From d9cfe061b9b501511f5d769f751c8ff6bbcb1bf9 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 17 八月 2023 15:20:25 +0800 Subject: [PATCH] 优化对讲释放流程 --- src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 160 ++++++++++++++++++++++++++--------------------------- 1 files changed, 78 insertions(+), 82 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 9ebe789..dbde373 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.DynamicTask; @@ -11,11 +10,12 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; @@ -49,21 +49,14 @@ public class PlatformServiceImpl implements IPlatformService { private final static String REGISTER_KEY_PREFIX = "platform_register_"; + + private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_"; private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_"; private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class); @Autowired private ParentPlatformMapper platformMapper; - - @Autowired - private PlatformCatalogMapper catalogMapper; - - @Autowired - private PlatformChannelMapper platformChannelMapper; - - @Autowired - private PlatformGbStreamMapper platformGbStreamMapper; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -81,7 +74,7 @@ private DynamicTask dynamicTask; @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + private ZLMServerFactory zlmServerFactory; @Autowired private SubscribeHolder subscribeHolder; @@ -158,14 +151,6 @@ ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId()); ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId()); parentPlatform.setUpdateTime(DateUtil.getNow()); - if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) { - // 鐩綍缁撴瀯鍙戠敓鍙樺寲锛屾竻绌轰箣鍓嶇殑鍏宠仈鍏崇郴 - logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜扮洰褰曠粨鏋勫彉鍖栵紝娓呯┖鍏宠仈鍏崇郴", parentPlatform.getDeviceGBId()); - catalogMapper.delByPlatformId(parentPlatformOld.getServerGBId()); - platformChannelMapper.delByPlatformId(parentPlatformOld.getServerGBId()); - platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId()); - } - // 鍋滄蹇冭烦瀹氭椂 final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId(); @@ -176,12 +161,11 @@ // 娉ㄩ攢鏃х殑 try { if (parentPlatformOld.isStatus()) { - logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版晳骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatformOld.getServerGBId()); + logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版棫骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatformOld.getServerGBId()); commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> { logger.info("[鍥芥爣绾ц仈] 娉ㄩ攢鎴愬姛锛� 骞冲彴锛歿}", parentPlatformOld.getServerGBId()); }); } - } catch (InvalidArgumentException | ParseException | SipException e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄩ攢: {}", e.getMessage()); } @@ -214,9 +198,6 @@ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈: {}", e.getMessage()); } } - // 閲嶆柊寮�鍚畾鏃舵敞鍐岋紝 浣跨敤缁娑堟伅 - // 閲嶆柊寮�濮嬪績璺充繚娲� - return false; } @@ -225,6 +206,9 @@ @Override public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) { logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎", parentPlatform.getServerGBId()); + final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId(); + dynamicTask.stop(registerFailAgainTaskKey); + platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); if (parentPlatformCatch == null) { @@ -265,15 +249,9 @@ // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎 if (platformCatch.getKeepAliveReply() == 2) { // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽 - logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId()); - try { - commanderForPlatform.register(parentPlatform, eventResult1 -> { - logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId()); - offline(parentPlatform, false); - }, null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage()); - } + logger.info("[鍥芥爣绾ц仈] 涓夋蹇冭烦瓒呮椂, 骞冲彴{}({})绂荤嚎", parentPlatform.getName(), parentPlatform.getServerGBId()); + offline(parentPlatform, false); + } }else { @@ -299,21 +277,22 @@ private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){ try { - // 璁剧疆瓒呮椂閲嶅彂锛� 鍚庣画浠庡簳灞傛敮鎸佹秷鎭噸鍙� - String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout"; - if (dynamicTask.isAlive(key)) { - return; + // 涓嶅湪鍚屼竴涓細璇濅腑缁鍒欐瘡娆″叏鏂版敞鍐� + if (!userSetting.isRegisterKeepIntDialog()) { + sipTransactionInfo = null; } - dynamicTask.startDelay(key, ()->{ - registerTask(parentPlatform, sipTransactionInfo); - }, 1000); - logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId()); + + if (sipTransactionInfo == null) { + logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬮噸鏂版敞鍐�", parentPlatform.getServerGBId()); + }else { + logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId()); + } + commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> { - dynamicTask.stop(key); + logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽澶辫触锛寋}:{}", parentPlatform.getServerGBId(), + eventResult.statusCode, eventResult.msg); offline(parentPlatform, false); - },eventResult -> { - dynamicTask.stop(key); - }); + }, null); } catch (InvalidArgumentException | ParseException | SipException e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage()); } @@ -334,24 +313,35 @@ // 鍋滄鎵�鏈夋帹娴� logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId()); stopAllPush(parentPlatform.getServerGBId()); - if (stopRegister) { - // 娓呴櫎娉ㄥ唽瀹氭椂 - logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId()); - final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); - if (dynamicTask.contains(registerTaskKey)) { - dynamicTask.stop(registerTaskKey); - } + + // 娓呴櫎娉ㄥ唽瀹氭椂 + logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId()); + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); + if (dynamicTask.contains(registerTaskKey)) { + dynamicTask.stop(registerTaskKey); } // 娓呴櫎蹇冭烦瀹氭椂 logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId()); final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); if (dynamicTask.contains(keepaliveTaskKey)) { - // 娣诲姞蹇冭烦浠诲姟 + // 娓呴櫎蹇冭烦浠诲姟 dynamicTask.stop(keepaliveTaskKey); } // 鍋滄鐩綍璁㈤槄鍥炲 logger.info("[骞冲彴绂荤嚎] {}, 鍋滄璁㈤槄鍥炲", parentPlatform.getServerGBId()); subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId()); + // 鍙戣捣瀹氭椂鑷姩閲嶆柊娉ㄥ唽 + if (!stopRegister) { + // 璁剧疆涓�60绉掕嚜鍔ㄥ皾璇曢噸鏂版敞鍐� + final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId(); + ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId()); + if (platform.isEnable()) { + dynamicTask.startCron(registerFailAgainTaskKey, + ()-> registerTask(platform, null), + userSetting.getRegisterAgainAfterTime() * 1000); + } + + } } private void stopAllPush(String platformId) { @@ -365,7 +355,7 @@ param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); param.put("stream", sendRtpItem.getStream()); - zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + zlmServerFactory.stopSendRtpStream(mediaInfo, param); } } } @@ -432,21 +422,21 @@ } InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId); - if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { // 濡傛灉zlm涓嶅瓨鍦ㄨ繖涓祦锛屽垯鍒犻櫎鏁版嵁鍗冲彲 MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); if (mediaServerItemForStreamInfo != null) { - Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream()); + Boolean ready = zlmServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream()); if (!ready) { // 閿欒瀛樺湪浜巖edis涓殑鏁版嵁 inviteStreamService.removeInviteInfo(inviteInfo); }else { // 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉 - JSONObject json = new JSONObject(); - json.put("app", inviteInfo.getStreamInfo().getApp()); - json.put("stream", inviteInfo.getStreamInfo().getStream()); - hookEvent.response(mediaServerItemForStreamInfo, json); + OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam(); + hookParam.setApp(inviteInfo.getStreamInfo().getApp()); + hookParam.setStream(inviteInfo.getStreamInfo().getStream()); + + hookEvent.response(mediaServerItemForStreamInfo, hookParam); return; } } @@ -499,14 +489,14 @@ } } }, userSetting.getPlayTimeout()); - commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{ + commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{ logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀跺埌涓婄骇鎺ㄦ祦 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); dynamicTask.stop(timeOutTaskKey); // hook鍝嶅簲 - playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId); + playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId); // 鏀跺埌娴� if (hookEvent != null) { - hookEvent.response(mediaServerItem, response); + hookEvent.response(mediaServerItem, hookParam); } }, event -> { // 鏀跺埌200OK 妫�娴媠src鏄惁鏈夊彉鍖栵紝闃叉涓婄骇鑷畾涔変簡ssrc @@ -525,30 +515,20 @@ logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable()) { logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) { - // ssrc 涓嶅彲鐢� - // 閲婃斁ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); - event.msg = "涓嬬骇鑷畾涔変簡ssrc,浣嗘槸姝src涓嶅彲鐢�"; - event.statusCode = 400; - errorEvent.response(event); - return; - } - + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚� if (!mediaServerItem.isRtpEnable()) { // 娣诲姞璁㈤槄 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook鍝嶅簲 - playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId); - hookEvent.response(mediaServerItemInUse, response); + playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId); + hookEvent.response(mediaServerItemInUse, hookParam); }); } // 鍏抽棴rtp server @@ -566,7 +546,23 @@ } @Override - public void stopBroadcast(ParentPlatform platform, String channelId, String stream) throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException { - commanderForPlatform.streamByeCmd(platform, channelId, stream, null, null); + public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServerItem mediaServerItem) { + + try { + if (sendBye) { + commanderForPlatform.streamByeCmd(platform, channel.getChannelId(), stream, null, null); + } + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.warn("[娑堟伅鍙戦�佸け璐 鍋滄璇煶瀵硅锛� 骞冲彴锛歿}锛岄�氶亾锛歿}", platform.getId(), channel.getChannelId() ); + } finally { + mediaServerService.closeRTPServer(mediaServerItem, stream); + InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getChannelId(), stream); + if (inviteInfo != null) { + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc()); + inviteStreamService.removeInviteInfo(inviteInfo); + } + streamSession.remove(platform.getServerGBId(), channel.getChannelId(), stream); + } } } -- Gitblit v1.8.0