old mode 100644
new mode 100755
| | |
| | | package com.genersoft.iot.vmp.service.impl; |
| | | |
| | | import com.baomidou.dynamic.datasource.annotation.DS; |
| | | import com.genersoft.iot.vmp.common.*; |
| | | import com.genersoft.iot.vmp.conf.DynamicTask; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; |
| | | import com.genersoft.iot.vmp.gb28181.bean.*; |
| | | import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; |
| | | import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; |
| | | import com.genersoft.iot.vmp.gb28181.utils.SipUtils; |
| | | import com.genersoft.iot.vmp.media.event.hook.HookData; |
| | | import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; |
| | | import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; |
| | | import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; |
| | | import com.genersoft.iot.vmp.media.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; |
| | | import com.genersoft.iot.vmp.media.bean.MediaServer; |
| | | import com.genersoft.iot.vmp.service.IInviteStreamService; |
| | | import com.genersoft.iot.vmp.service.IPlatformService; |
| | | import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; |
| | | import com.genersoft.iot.vmp.service.IPlayService; |
| | | import com.genersoft.iot.vmp.service.bean.*; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.dao.*; |
| | | import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; |
| | | import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; |
| | | import com.genersoft.iot.vmp.utils.DateUtil; |
| | | import com.github.pagehelper.PageHelper; |
| | | import com.github.pagehelper.PageInfo; |
| | | import gov.nist.javax.sip.message.SIPResponse; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.event.EventListener; |
| | | import org.springframework.scheduling.annotation.Async; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import javax.sdp.*; |
| | | import javax.sip.InvalidArgumentException; |
| | | import javax.sip.ResponseEvent; |
| | | import javax.sip.SipException; |
| | | import java.text.ParseException; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.*; |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | @Service |
| | | @DS("master") |
| | | public class PlatformServiceImpl implements IPlatformService { |
| | | |
| | | private final static String REGISTER_KEY_PREFIX = "platform_register_"; |
| | |
| | | private ParentPlatformMapper platformMapper; |
| | | |
| | | @Autowired |
| | | private PlatformCatalogMapper catalogMapper; |
| | | |
| | | @Autowired |
| | | private PlatformChannelMapper platformChannelMapper; |
| | | |
| | | @Autowired |
| | | private PlatformGbStreamMapper platformGbStreamMapper; |
| | | |
| | | @Autowired |
| | | private IRedisCatchStorage redisCatchStorage; |
| | | |
| | | @Autowired |
| | | private SSRCFactory ssrcFactory; |
| | | |
| | | @Autowired |
| | | private IMediaServerService mediaServerService; |
| | | |
| | | @Autowired |
| | | private SIPCommanderFroPlatform commanderForPlatform; |
| | | private ISIPCommanderForPlatform commanderForPlatform; |
| | | |
| | | @Autowired |
| | | private DynamicTask dynamicTask; |
| | | |
| | | @Autowired |
| | | private ZLMRTPServerFactory zlmrtpServerFactory; |
| | | private ZLMServerFactory zlmServerFactory; |
| | | |
| | | @Autowired |
| | | private SubscribeHolder subscribeHolder; |
| | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | | |
| | | @Autowired |
| | | private HookSubscribe subscribe; |
| | | |
| | | @Autowired |
| | | private VideoStreamSessionManager streamSession; |
| | | |
| | | @Autowired |
| | | private IPlayService playService; |
| | | |
| | | @Autowired |
| | | private IInviteStreamService inviteStreamService; |
| | | |
| | | |
| | | /** |
| | | * 流离开的处理 |
| | | */ |
| | | @Async("taskExecutor") |
| | | @EventListener |
| | | public void onApplicationEvent(MediaDepartureEvent event) { |
| | | List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); |
| | | if (!sendRtpItems.isEmpty()) { |
| | | for (SendRtpItem sendRtpItem : sendRtpItems) { |
| | | if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { |
| | | String platformId = sendRtpItem.getPlatformId(); |
| | | ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId); |
| | | |
| | | try { |
| | | if (platform != null) { |
| | | commanderForPlatform.streamByeCmd(platform, sendRtpItem); |
| | | redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), |
| | | sendRtpItem.getCallId(), sendRtpItem.getStream()); |
| | | } |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 发流停止 |
| | | */ |
| | | @Async("taskExecutor") |
| | | @EventListener |
| | | public void onApplicationEvent(MediaSendRtpStoppedEvent event) { |
| | | List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); |
| | | if (sendRtpItems != null && !sendRtpItems.isEmpty()) { |
| | | for (SendRtpItem sendRtpItem : sendRtpItems) { |
| | | ParentPlatform parentPlatform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getPlatformId()); |
| | | ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); |
| | | try { |
| | | commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); |
| | | } |
| | | redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), |
| | | sendRtpItem.getCallId(), sendRtpItem.getStream()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | @Override |
| | |
| | | // 行政区划默认去编号的前6位 |
| | | parentPlatform.setAdministrativeDivision(parentPlatform.getServerGBId().substring(0,6)); |
| | | } |
| | | parentPlatform.setTreeType("CivilCode"); |
| | | parentPlatform.setCatalogId(parentPlatform.getDeviceGBId()); |
| | | int result = platformMapper.addParentPlatform(parentPlatform); |
| | | // 添加缓存 |
| | |
| | | @Override |
| | | public boolean update(ParentPlatform parentPlatform) { |
| | | logger.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId()); |
| | | // TODO 后续版本去除 |
| | | parentPlatform.setTreeType(""); |
| | | parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase()); |
| | | ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId()); |
| | | ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId()); |
| | |
| | | dynamicTask.stop(registerTaskKey); |
| | | // 注销旧的 |
| | | try { |
| | | if (parentPlatformOld.isStatus()) { |
| | | if (parentPlatformOld.isStatus() && parentPlatformCatchOld != null) { |
| | | logger.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId()); |
| | | commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> { |
| | | logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId()); |
| | |
| | | logger.error("[命令发送失败] 国标级联: {}", e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | |
| | | return false; |
| | | } |
| | |
| | | try { |
| | | commanderForPlatform.keepalive(parentPlatform, eventResult -> { |
| | | // 心跳失败 |
| | | if (eventResult.type == SipSubscribe.EventResultType.timeout) { |
| | | // 心跳超时 |
| | | ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); |
| | | // 此时是第三次心跳超时, 平台离线 |
| | | if (platformCatch.getKeepAliveReply() == 2) { |
| | | // 设置平台离线,并重新注册 |
| | | logger.info("[国标级联] 三次心跳超时, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId()); |
| | | offline(parentPlatform, false); |
| | | |
| | | } |
| | | |
| | | }else { |
| | | if (eventResult.type != SipSubscribe.EventResultType.timeout) { |
| | | logger.warn("[国标级联]发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg); |
| | | } |
| | | // 心跳失败 |
| | | ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); |
| | | // 此时是第三次心跳超时, 平台离线 |
| | | if (platformCatch.getKeepAliveReply() == 2) { |
| | | // 设置平台离线,并重新注册 |
| | | logger.info("[国标级联] 三次心跳失败, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId()); |
| | | offline(parentPlatform, false); |
| | | }else { |
| | | platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1); |
| | | redisCatchStorage.updatePlatformCatchInfo(platformCatch); |
| | | } |
| | | |
| | | }, eventResult -> { |
| | |
| | | platformCatch.setKeepAliveReply(0); |
| | | redisCatchStorage.updatePlatformCatchInfo(platformCatch); |
| | | } |
| | | logger.info("[发送心跳] 国标级联 发送心跳, code: {}, msg: {}", eventResult.statusCode, eventResult.msg); |
| | | }); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage()); |
| | |
| | | }, |
| | | (parentPlatform.getKeepTimeout())*1000); |
| | | } |
| | | if (parentPlatform.isAutoPushChannel()) { |
| | | if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) { |
| | | logger.info("[国标级联]:{}, 添加自动通道推送模拟订阅信息", parentPlatform.getServerGBId()); |
| | | addSimulatedSubscribeInfo(parentPlatform); |
| | | } |
| | | }else { |
| | | SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()); |
| | | if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) { |
| | | subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) { |
| | | // 自动添加一条模拟的订阅信息 |
| | | SubscribeInfo subscribeInfo = new SubscribeInfo(); |
| | | subscribeInfo.setId(parentPlatform.getServerGBId()); |
| | | subscribeInfo.setExpires(-1); |
| | | subscribeInfo.setEventType("Catalog"); |
| | | int random = (int) Math.floor(Math.random() * 10000); |
| | | subscribeInfo.setEventId(random + ""); |
| | | subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP()); |
| | | subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", "")); |
| | | subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", "")); |
| | | subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo); |
| | | } |
| | | |
| | | private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){ |
| | |
| | | eventResult.statusCode, eventResult.msg); |
| | | offline(parentPlatform, false); |
| | | }, null); |
| | | } catch (InvalidArgumentException | ParseException | SipException e) { |
| | | } catch (Exception e) { |
| | | logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage()); |
| | | } |
| | | } |
| | |
| | | // 清除心跳任务 |
| | | dynamicTask.stop(keepaliveTaskKey); |
| | | } |
| | | // 停止目录订阅回复 |
| | | logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId()); |
| | | subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId()); |
| | | // 停止订阅回复 |
| | | SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()); |
| | | if (catalogSubscribe != null) { |
| | | if (catalogSubscribe.getExpires() > 0) { |
| | | logger.info("[平台离线] {}, 停止目录订阅回复", parentPlatform.getServerGBId()); |
| | | subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId()); |
| | | } |
| | | } |
| | | logger.info("[平台离线] {}, 停止移动位置订阅回复", parentPlatform.getServerGBId()); |
| | | subscribeHolder.removeMobilePositionSubscribe(parentPlatform.getServerGBId()); |
| | | // 发起定时自动重新注册 |
| | | if (!stopRegister) { |
| | | // 设置为60秒自动尝试重新注册 |
| | |
| | | ()-> registerTask(platform, null), |
| | | userSetting.getRegisterAgainAfterTime() * 1000); |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | |
| | | List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId); |
| | | if (sendRtpItems != null && sendRtpItems.size() > 0) { |
| | | for (SendRtpItem sendRtpItem : sendRtpItems) { |
| | | ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); |
| | | redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null); |
| | | MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | Map<String, Object> param = new HashMap<>(3); |
| | | param.put("vhost", "__defaultVhost__"); |
| | | param.put("app", sendRtpItem.getApp()); |
| | | param.put("stream", sendRtpItem.getStreamId()); |
| | | zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); |
| | | param.put("stream", sendRtpItem.getStream()); |
| | | zlmServerFactory.stopSendRtpStream(mediaInfo, param); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void broadcastInvite(ParentPlatform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent, |
| | | SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException { |
| | | |
| | | if (mediaServerItem == null) { |
| | | logger.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId()); |
| | | return; |
| | | } |
| | | InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId); |
| | | |
| | | if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) { |
| | | // 如果zlm不存在这个流,则删除数据即可 |
| | | MediaServer mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServerId()); |
| | | if (mediaServerItemForStreamInfo != null) { |
| | | Boolean ready = mediaServerService.isStreamReady(mediaServerItemForStreamInfo, inviteInfoForOld.getStreamInfo().getApp(), inviteInfoForOld.getStreamInfo().getStream()); |
| | | if (!ready) { |
| | | // 错误存在于redis中的数据 |
| | | inviteStreamService.removeInviteInfo(inviteInfoForOld); |
| | | }else { |
| | | // 流确实尚在推流,直接回调结果 |
| | | HookData hookData = new HookData(); |
| | | hookData.setApp(inviteInfoForOld.getStreamInfo().getApp()); |
| | | hookData.setStream(inviteInfoForOld.getStreamInfo().getStream()); |
| | | hookData.setMediaServer(mediaServerItemForStreamInfo); |
| | | hookEvent.response(hookData); |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | |
| | | String streamId = null; |
| | | if (mediaServerItem.isRtpEnable()) { |
| | | streamId = String.format("%s_%s", platform.getServerGBId(), channelId); |
| | | } |
| | | // 默认不进行SSRC校验, TODO 后续可改为配置 |
| | | boolean ssrcCheck = false; |
| | | int tcpMode; |
| | | if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-PASSIVE")) { |
| | | tcpMode = 1; |
| | | }else if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) { |
| | | tcpMode = 2; |
| | | } else { |
| | | tcpMode = 0; |
| | | } |
| | | SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode); |
| | | if (ssrcInfo == null || ssrcInfo.getPort() < 0) { |
| | | logger.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channelId); |
| | | SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>(); |
| | | eventResult.statusCode = -1; |
| | | eventResult.msg = "端口监听失败"; |
| | | eventResult.type = SipSubscribe.EventResultType.failedToGetPort; |
| | | errorEvent.response(eventResult); |
| | | return; |
| | | } |
| | | logger.info("[国标级联] 语音喊话,发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", |
| | | platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck); |
| | | |
| | | // 初始化redis中的invite消息状态 |
| | | InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channelId, ssrcInfo.getStream(), ssrcInfo, |
| | | mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST, |
| | | InviteSessionStatus.ready); |
| | | inviteStreamService.updateInviteInfo(inviteInfo); |
| | | String timeOutTaskKey = UUID.randomUUID().toString(); |
| | | dynamicTask.startDelay(timeOutTaskKey, () -> { |
| | | // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 |
| | | InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null); |
| | | if (inviteInfoForBroadcast == null) { |
| | | logger.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); |
| | | // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 |
| | | try { |
| | | commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null); |
| | | } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { |
| | | logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); |
| | | } finally { |
| | | timeoutCallback.run(1, "收流超时"); |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); |
| | | mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); |
| | | streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); |
| | | mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); |
| | | } |
| | | } |
| | | }, userSetting.getPlayTimeout()); |
| | | commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{ |
| | | logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); |
| | | dynamicTask.stop(timeOutTaskKey); |
| | | // hook响应 |
| | | playService.onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId); |
| | | // 收到流 |
| | | if (hookEvent != null) { |
| | | hookEvent.response(hookData); |
| | | } |
| | | }, event -> { |
| | | |
| | | inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channelId, timeOutTaskKey, |
| | | null, inviteInfo, InviteSessionType.BROADCAST); |
| | | // // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc |
| | | // ResponseEvent responseEvent = (ResponseEvent) event.event; |
| | | // String contentString = new String(responseEvent.getResponse().getRawContent()); |
| | | // // 获取ssrc |
| | | // int ssrcIndex = contentString.indexOf("y="); |
| | | // // 检查是否有y字段 |
| | | // if (ssrcIndex >= 0) { |
| | | // //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 |
| | | // String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); |
| | | // // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 |
| | | // if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) { |
| | | // tcpActiveHandler(platform, ) |
| | | // return; |
| | | // } |
| | | // logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); |
| | | // if (!mediaServerItem.isRtpEnable()) { |
| | | // logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); |
| | | // // 释放ssrc |
| | | // mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); |
| | | // // 单端口模式streamId也有变化,需要重新设置监听 |
| | | // if (!mediaServerItem.isRtpEnable()) { |
| | | // // 添加订阅 |
| | | // HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); |
| | | // subscribe.removeSubscribe(hookSubscribe); |
| | | // hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); |
| | | // subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { |
| | | // logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); |
| | | // dynamicTask.stop(timeOutTaskKey); |
| | | // // hook响应 |
| | | // playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId); |
| | | // hookEvent.response(mediaServerItemInUse, hookParam); |
| | | // }); |
| | | // } |
| | | // // 关闭rtp server |
| | | // mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); |
| | | // // 重新开启ssrc server |
| | | // mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true, false, tcpMode); |
| | | // } |
| | | // } |
| | | }, eventResult -> { |
| | | // 收到错误回复 |
| | | if (errorEvent != null) { |
| | | errorEvent.response(eventResult); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem, |
| | | ParentPlatform platform, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback, |
| | | InviteInfo inviteInfo, InviteSessionType inviteSessionType){ |
| | | inviteInfo.setStatus(InviteSessionStatus.ok); |
| | | ResponseEvent responseEvent = (ResponseEvent) eventResult.event; |
| | | String contentString = new String(responseEvent.getResponse().getRawContent()); |
| | | System.out.println(contentString); |
| | | String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString); |
| | | // 兼容回复的消息中缺少ssrc(y字段)的情况 |
| | | if (ssrcInResponse == null) { |
| | | ssrcInResponse = ssrcInfo.getSsrc(); |
| | | } |
| | | if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { |
| | | // ssrc 一致 |
| | | if (mediaServerItem.isRtpEnable()) { |
| | | // 多端口 |
| | | if (tcpMode == 2) { |
| | | tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, |
| | | timeOutTaskKey, ssrcInfo, callback); |
| | | } |
| | | }else { |
| | | // 单端口 |
| | | if (tcpMode == 2) { |
| | | logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); |
| | | } |
| | | } |
| | | }else { |
| | | logger.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); |
| | | // ssrc 不一致 |
| | | if (mediaServerItem.isRtpEnable()) { |
| | | // 多端口 |
| | | if (ssrcCheck) { |
| | | // ssrc检验 |
| | | // 更新ssrc |
| | | logger.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); |
| | | // 释放ssrc |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); |
| | | Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); |
| | | if (!result) { |
| | | try { |
| | | logger.warn("[Invite 200OK] 更新ssrc失败,停止喊话 {}/{}", platform.getServerGBId(), channelId); |
| | | commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null); |
| | | } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { |
| | | logger.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); |
| | | } |
| | | |
| | | dynamicTask.stop(timeOutTaskKey); |
| | | // 释放ssrc |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); |
| | | |
| | | streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); |
| | | |
| | | callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), |
| | | "下级自定义了ssrc,重新设置收流信息失败", null); |
| | | inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channelId, null, |
| | | InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), |
| | | "下级自定义了ssrc,重新设置收流信息失败", null); |
| | | |
| | | }else { |
| | | ssrcInfo.setSsrc(ssrcInResponse); |
| | | inviteInfo.setSsrcInfo(ssrcInfo); |
| | | inviteInfo.setStream(ssrcInfo.getStream()); |
| | | if (tcpMode == 2) { |
| | | if (mediaServerItem.isRtpEnable()) { |
| | | tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, |
| | | timeOutTaskKey, ssrcInfo, callback); |
| | | }else { |
| | | logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); |
| | | } |
| | | } |
| | | inviteStreamService.updateInviteInfo(inviteInfo); |
| | | } |
| | | }else { |
| | | ssrcInfo.setSsrc(ssrcInResponse); |
| | | inviteInfo.setSsrcInfo(ssrcInfo); |
| | | inviteInfo.setStream(ssrcInfo.getStream()); |
| | | if (tcpMode == 2) { |
| | | if (mediaServerItem.isRtpEnable()) { |
| | | tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, |
| | | timeOutTaskKey, ssrcInfo, callback); |
| | | }else { |
| | | logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); |
| | | } |
| | | } |
| | | inviteStreamService.updateInviteInfo(inviteInfo); |
| | | } |
| | | }else { |
| | | if (ssrcInResponse != null) { |
| | | // 单端口 |
| | | // 重新订阅流上线 |
| | | SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(), |
| | | inviteInfo.getChannelId(), null, inviteInfo.getStream()); |
| | | streamSession.remove(inviteInfo.getDeviceId(), |
| | | inviteInfo.getChannelId(), inviteInfo.getStream()); |
| | | inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse); |
| | | streamSession.put(platform.getServerGBId(), channelId, ssrcTransaction.getCallId(), |
| | | inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private void tcpActiveHandler(ParentPlatform platform, String channelId, String contentString, |
| | | MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck, |
| | | String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){ |
| | | if (tcpMode != 2) { |
| | | return; |
| | | } |
| | | |
| | | String substring; |
| | | if (contentString.indexOf("y=") > 0) { |
| | | substring = contentString.substring(0, contentString.indexOf("y=")); |
| | | }else { |
| | | substring = contentString; |
| | | } |
| | | try { |
| | | SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); |
| | | int port = -1; |
| | | Vector mediaDescriptions = sdp.getMediaDescriptions(true); |
| | | for (Object description : mediaDescriptions) { |
| | | MediaDescription mediaDescription = (MediaDescription) description; |
| | | Media media = mediaDescription.getMedia(); |
| | | |
| | | Vector mediaFormats = media.getMediaFormats(false); |
| | | if (mediaFormats.contains("8") || mediaFormats.contains("0")) { |
| | | port = media.getMediaPort(); |
| | | break; |
| | | } |
| | | } |
| | | logger.info("[TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验:{}", |
| | | platform.getServerGBId(), channelId, sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck); |
| | | Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); |
| | | logger.info("[TCP主动连接对方] 结果: {}", result); |
| | | } catch (SdpException e) { |
| | | logger.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channelId, e); |
| | | dynamicTask.stop(timeOutTaskKey); |
| | | mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); |
| | | // 释放ssrc |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); |
| | | |
| | | streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); |
| | | |
| | | callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), |
| | | InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); |
| | | inviteStreamService.call(InviteSessionType.PLAY, platform.getServerGBId(), channelId, null, |
| | | InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), |
| | | InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServer mediaServerItem) { |
| | | |
| | | try { |
| | | if (sendBye) { |
| | | commanderForPlatform.streamByeCmd(platform, channel.getChannelId(), stream, null, null); |
| | | } |
| | | } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { |
| | | logger.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getChannelId() ); |
| | | } finally { |
| | | mediaServerService.closeRTPServer(mediaServerItem, stream); |
| | | InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getChannelId(), stream); |
| | | if (inviteInfo != null) { |
| | | // 释放ssrc |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc()); |
| | | inviteStreamService.removeInviteInfo(inviteInfo); |
| | | } |
| | | streamSession.remove(platform.getServerGBId(), channel.getChannelId(), stream); |
| | | } |
| | | } |
| | | } |