old mode 100644
new mode 100755
|  |  |  | 
|---|
|  |  |  | import com.genersoft.iot.vmp.conf.UserSetting; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.bean.*; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.utils.SipUtils; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IMediaServerService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IPlatformService; | 
|---|
|  |  |  | 
|---|
|  |  |  | import com.genersoft.iot.vmp.utils.DateUtil; | 
|---|
|  |  |  | import com.github.pagehelper.PageHelper; | 
|---|
|  |  |  | import com.github.pagehelper.PageInfo; | 
|---|
|  |  |  | import gov.nist.javax.sip.message.SIPRequest; | 
|---|
|  |  |  | import org.slf4j.Logger; | 
|---|
|  |  |  | import org.slf4j.LoggerFactory; | 
|---|
|  |  |  | import org.springframework.beans.factory.annotation.Autowired; | 
|---|
|  |  |  | import org.springframework.stereotype.Service; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import javax.sip.InvalidArgumentException; | 
|---|
|  |  |  | import javax.sip.PeerUnavailableException; | 
|---|
|  |  |  | import javax.sip.SipException; | 
|---|
|  |  |  | import javax.sip.SipFactory; | 
|---|
|  |  |  | import javax.sip.address.Address; | 
|---|
|  |  |  | import javax.sip.address.SipURI; | 
|---|
|  |  |  | import javax.sip.header.*; | 
|---|
|  |  |  | import javax.sip.message.Request; | 
|---|
|  |  |  | import java.text.ParseException; | 
|---|
|  |  |  | import java.util.HashMap; | 
|---|
|  |  |  | import java.util.List; | 
|---|
|  |  |  | import java.util.Map; | 
|---|
|  |  |  | import java.util.*; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * @author lin | 
|---|
|  |  |  | 
|---|
|  |  |  | public class PlatformServiceImpl implements IPlatformService { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private final static String REGISTER_KEY_PREFIX = "platform_register_"; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_"; | 
|---|
|  |  |  | private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_"; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class); | 
|---|
|  |  |  | 
|---|
|  |  |  | private IRedisCatchStorage redisCatchStorage; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private SSRCFactory ssrcFactory; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private IMediaServerService mediaServerService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | 
|---|
|  |  |  | private DynamicTask dynamicTask; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private ZLMRTPServerFactory zlmrtpServerFactory; | 
|---|
|  |  |  | private ZLMServerFactory zlmServerFactory; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private SubscribeHolder subscribeHolder; | 
|---|
|  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public boolean update(ParentPlatform parentPlatform) { | 
|---|
|  |  |  | logger.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId()); | 
|---|
|  |  |  | // TODO 后续版本去除 | 
|---|
|  |  |  | parentPlatform.setTreeType(""); | 
|---|
|  |  |  | parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase()); | 
|---|
|  |  |  | ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId()); | 
|---|
|  |  |  | ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId()); | 
|---|
|  |  |  | 
|---|
|  |  |  | // 注销旧的 | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | if (parentPlatformOld.isStatus()) { | 
|---|
|  |  |  | logger.info("保存平台{}时发现救平台在线,发送注销命令", parentPlatformOld.getServerGBId()); | 
|---|
|  |  |  | logger.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId()); | 
|---|
|  |  |  | commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> { | 
|---|
|  |  |  | logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId()); | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } catch (InvalidArgumentException | ParseException | SipException e) { | 
|---|
|  |  |  | logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | 
|---|
|  |  |  | logger.error("[命令发送失败] 国标级联: {}", e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | // 重新开启定时注册, 使用续订消息 | 
|---|
|  |  |  | // 重新开始心跳保活 | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | return false; | 
|---|
|  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) { | 
|---|
|  |  |  | logger.info("[国标级联]:{}, 平台上线", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId(); | 
|---|
|  |  |  | dynamicTask.stop(registerFailAgainTaskKey); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true); | 
|---|
|  |  |  | ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | if (parentPlatformCatch == null) { | 
|---|
|  |  |  | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | commanderForPlatform.keepalive(parentPlatform, eventResult -> { | 
|---|
|  |  |  | // 心跳失败 | 
|---|
|  |  |  | if (eventResult.type == SipSubscribe.EventResultType.timeout) { | 
|---|
|  |  |  | // 心跳超时 | 
|---|
|  |  |  | ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | // 此时是第三次心跳超时, 平台离线 | 
|---|
|  |  |  | if (platformCatch.getKeepAliveReply()  == 2) { | 
|---|
|  |  |  | // 设置平台离线,并重新注册 | 
|---|
|  |  |  | logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | commanderForPlatform.register(parentPlatform, eventResult1 -> { | 
|---|
|  |  |  | logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败,开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | offline(parentPlatform, false); | 
|---|
|  |  |  | }, null); | 
|---|
|  |  |  | } catch (InvalidArgumentException | ParseException | SipException e) { | 
|---|
|  |  |  | logger.error("[命令发送失败] 国标级联 注册: {}", e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | if (eventResult.type != SipSubscribe.EventResultType.timeout) { | 
|---|
|  |  |  | logger.warn("[国标级联]发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | // 心跳失败 | 
|---|
|  |  |  | ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | // 此时是第三次心跳超时, 平台离线 | 
|---|
|  |  |  | if (platformCatch.getKeepAliveReply()  == 2) { | 
|---|
|  |  |  | // 设置平台离线,并重新注册 | 
|---|
|  |  |  | logger.info("[国标级联] 三次心跳失败, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId()); | 
|---|
|  |  |  | offline(parentPlatform, false); | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1); | 
|---|
|  |  |  | redisCatchStorage.updatePlatformCatchInfo(platformCatch); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | }, eventResult -> { | 
|---|
|  |  |  | 
|---|
|  |  |  | platformCatch.setKeepAliveReply(0); | 
|---|
|  |  |  | redisCatchStorage.updatePlatformCatchInfo(platformCatch); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | logger.info("[发送心跳] 国标级联 发送心跳, code: {}, msg: {}", eventResult.statusCode, eventResult.msg); | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } catch (SipException | InvalidArgumentException | ParseException e) { | 
|---|
|  |  |  | logger.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage()); | 
|---|
|  |  |  | 
|---|
|  |  |  | }, | 
|---|
|  |  |  | (parentPlatform.getKeepTimeout())*1000); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if (parentPlatform.isAutoPushChannel()) { | 
|---|
|  |  |  | if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) { | 
|---|
|  |  |  | addSimulatedSubscribeInfo(parentPlatform); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) { | 
|---|
|  |  |  | subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) { | 
|---|
|  |  |  | // 自动添加一条模拟的订阅信息 | 
|---|
|  |  |  | SubscribeInfo subscribeInfo = new SubscribeInfo(); | 
|---|
|  |  |  | subscribeInfo.setId(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | subscribeInfo.setExpires(-1); | 
|---|
|  |  |  | subscribeInfo.setEventType("Catalog"); | 
|---|
|  |  |  | int random = (int) Math.floor(Math.random() * 10000); | 
|---|
|  |  |  | subscribeInfo.setEventId(random + ""); | 
|---|
|  |  |  | subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP()); | 
|---|
|  |  |  | subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", "")); | 
|---|
|  |  |  | subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", "")); | 
|---|
|  |  |  | subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){ | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | // 设置超时重发, 后续从底层支持消息重发 | 
|---|
|  |  |  | String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout"; | 
|---|
|  |  |  | if (dynamicTask.isAlive(key)) { | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | // 不在同一个会话中续订则每次全新注册 | 
|---|
|  |  |  | if (!userSetting.isRegisterKeepIntDialog()) { | 
|---|
|  |  |  | sipTransactionInfo = null; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | dynamicTask.startDelay(key, ()->{ | 
|---|
|  |  |  | registerTask(parentPlatform, sipTransactionInfo); | 
|---|
|  |  |  | }, 1000); | 
|---|
|  |  |  | logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if (sipTransactionInfo == null) { | 
|---|
|  |  |  | logger.info("[国标级联] 平台:{}注册即将到期,开始重新注册", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | commanderForPlatform.register(parentPlatform, sipTransactionInfo,  eventResult -> { | 
|---|
|  |  |  | dynamicTask.stop(key); | 
|---|
|  |  |  | logger.info("[国标级联] 平台:{}注册失败,{}:{}", parentPlatform.getServerGBId(), | 
|---|
|  |  |  | eventResult.statusCode, eventResult.msg); | 
|---|
|  |  |  | offline(parentPlatform, false); | 
|---|
|  |  |  | },eventResult -> { | 
|---|
|  |  |  | dynamicTask.stop(key); | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } catch (InvalidArgumentException | ParseException | SipException e) { | 
|---|
|  |  |  | }, null); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | 
|---|
|  |  |  | // 停止所有推流 | 
|---|
|  |  |  | logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | stopAllPush(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | if (stopRegister) { | 
|---|
|  |  |  | // 清除注册定时 | 
|---|
|  |  |  | logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); | 
|---|
|  |  |  | if (dynamicTask.contains(registerTaskKey)) { | 
|---|
|  |  |  | dynamicTask.stop(registerTaskKey); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | // 清除注册定时 | 
|---|
|  |  |  | logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); | 
|---|
|  |  |  | if (dynamicTask.contains(registerTaskKey)) { | 
|---|
|  |  |  | dynamicTask.stop(registerTaskKey); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | // 清除心跳定时 | 
|---|
|  |  |  | logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); | 
|---|
|  |  |  | if (dynamicTask.contains(keepaliveTaskKey)) { | 
|---|
|  |  |  | // 添加心跳任务 | 
|---|
|  |  |  | // 清除心跳任务 | 
|---|
|  |  |  | dynamicTask.stop(keepaliveTaskKey); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | // 停止目录订阅回复 | 
|---|
|  |  |  | logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId()); | 
|---|
|  |  |  | subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | // 发起定时自动重新注册 | 
|---|
|  |  |  | if (!stopRegister) { | 
|---|
|  |  |  | // 设置为60秒自动尝试重新注册 | 
|---|
|  |  |  | final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId(); | 
|---|
|  |  |  | ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId()); | 
|---|
|  |  |  | if (platform.isEnable()) { | 
|---|
|  |  |  | dynamicTask.startCron(registerFailAgainTaskKey, | 
|---|
|  |  |  | ()-> registerTask(platform, null), | 
|---|
|  |  |  | userSetting.getRegisterAgainAfterTime() * 1000); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private void stopAllPush(String platformId) { | 
|---|
|  |  |  | List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId); | 
|---|
|  |  |  | if (sendRtpItems != null && sendRtpItems.size() > 0) { | 
|---|
|  |  |  | for (SendRtpItem sendRtpItem : sendRtpItems) { | 
|---|
|  |  |  | ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); | 
|---|
|  |  |  | redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null); | 
|---|
|  |  |  | MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); | 
|---|
|  |  |  | Map<String, Object> param = new HashMap<>(3); | 
|---|
|  |  |  | param.put("vhost", "__defaultVhost__"); | 
|---|
|  |  |  | param.put("app", sendRtpItem.getApp()); | 
|---|
|  |  |  | param.put("stream", sendRtpItem.getStreamId()); | 
|---|
|  |  |  | zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); | 
|---|
|  |  |  | zlmServerFactory.stopSendRtpStream(mediaInfo, param); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|