sql/update.sql
@@ -1,12 +1,4 @@ alter table parent_platform add startOfflinePush int default 0 null; alter table stream_push add serverId varchar(50) not null; alter table parent_platform add administrativeDivision varchar(50) not null; alter table parent_platform add catalogGroup int default 1 null; alter table device add ssrcCheck int default 0 null; src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -97,4 +97,5 @@ //************************** 第三方 **************************************** public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; } src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
@@ -2,7 +2,9 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.service.impl.RedisAlarmMsgListener; import com.genersoft.iot.vmp.service.impl.RedisGPSMsgListener; import com.genersoft.iot.vmp.service.impl.RedisGpsMsgListener; import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.impl.RedisStreamMsgListener; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -47,10 +49,16 @@ private int poolMaxWait; @Autowired private RedisGPSMsgListener redisGPSMsgListener; private RedisGpsMsgListener redisGPSMsgListener; @Autowired private RedisAlarmMsgListener redisAlarmMsgListener; @Autowired private RedisStreamMsgListener redisStreamMsgListener; @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; @Bean public JedisPool jedisPool() { @@ -98,6 +106,8 @@ container.setConnectionFactory(connectionFactory); container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); return container; } src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -72,6 +72,11 @@ private String mediaServerId; /** * 使用的服务的ID */ private String serverId; /** * invite的callId */ private String CallId; @@ -259,4 +264,12 @@ public void setOnlyAudio(boolean onlyAudio) { this.onlyAudio = onlyAudio; } public String getServerId() { return serverId; } public void setServerId(String serverId) { this.serverId = serverId; } } src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -71,7 +71,9 @@ String gbId = gbStream.getGbId(); GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); if (gpsMsgInfo != null) { // 无最新位置不发送 logger.info("无最新位置不发送"); if (logger.isDebugEnabled()) { logger.debug("无最新位置不发送"); } // 经纬度都为0不发送 if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { continue; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -16,6 +16,8 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SerializeUtils; @@ -43,7 +45,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); private String method = "ACK"; private final String method = "ACK"; @Autowired private SIPProcessorObserver sipProcessorObserver; @@ -77,6 +79,9 @@ @Autowired private ISIPCommanderForPlatform commanderForPlatform; @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; /** @@ -114,78 +119,41 @@ param.put("pt", sendRtpItem.getPt()); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); if (sendRtpItem.isOnlyAudio()) { // TODO 可能是语音对讲 }else { // 向上级平台 commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); } if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, jsonObject->{ startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); }); }else { JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); } // if (streamInfo == null) { // 流还没上来,对方就回复ack // logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId()); // // 监听流上线 // // 添加订阅 // JSONObject subscribeKey = new JSONObject(); // subscribeKey.put("app", "rtp"); // subscribeKey.put("stream", sendRtpItem.getStreamId()); // subscribeKey.put("regist", true); // subscribeKey.put("schema", "rtmp"); // subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); // subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, // (MediaServerItem mediaServerItemInUse, JSONObject json)->{ // Map<String, Object> param = new HashMap<>(); // param.put("vhost","__defaultVhost__"); // param.put("app",json.getString("app")); // param.put("stream",json.getString("stream")); // param.put("ssrc", sendRtpItem.getSsrc()); // param.put("dst_url",sendRtpItem.getIp()); // param.put("dst_port", sendRtpItem.getPort()); // param.put("is_udp", is_Udp); // param.put("src_port", sendRtpItem.getLocalPort()); // zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); // }); // }else { // Map<String, Object> param = new HashMap<>(); // param.put("vhost","__defaultVhost__"); // param.put("app",streamInfo.getApp()); // param.put("stream",streamInfo.getStream()); // param.put("ssrc", sendRtpItem.getSsrc()); // param.put("dst_url",sendRtpItem.getIp()); // param.put("dst_port", sendRtpItem.getPort()); // param.put("is_udp", is_Udp); // param.put("src_port", sendRtpItem.getLocalPort()); // // JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); // if (jsonObject.getInteger("code") != 0) { // logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream()); // // 监听流上线 // // 添加订阅 // JSONObject subscribeKey = new JSONObject(); // subscribeKey.put("app", "rtp"); // subscribeKey.put("stream", streamInfo.getStream()); // subscribeKey.put("regist", true); // subscribeKey.put("schema", "rtmp"); // subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); // subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, // (MediaServerItem mediaServerItemInUse, JSONObject json)->{ // zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); // }); // } // } } } private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) { if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); if (sendRtpItem.isOnlyAudio()) { // TODO 可能是语音对讲 }else { // 向上级平台 commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); } } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -107,13 +107,9 @@ cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); messageForPushChannel.setType(0); messageForPushChannel.setGbId(sendRtpItem.getChannelId()); messageForPushChannel.setApp(sendRtpItem.getApp()); messageForPushChannel.setStream(sendRtpItem.getStreamId()); messageForPushChannel.setMediaServerId(sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormId(sendRtpItem.getPlatformId()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java
@@ -15,7 +15,7 @@ @Component public class CancelRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { private String method = "CANCEL"; private final String method = "CANCEL"; @Autowired private SIPProcessorObserver sipProcessorObserver; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -17,10 +17,13 @@ import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -50,562 +53,709 @@ @Component public class InviteRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); private String method = "INVITE"; private final String method = "INVITE"; @Autowired private SIPCommanderFroPlatform cmderFroPlatform; @Autowired private SIPCommanderFroPlatform cmderFroPlatform; @Autowired private IVideoManagerStorage storager; @Autowired private IVideoManagerStorage storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private IStreamPushService streamPushService; @Autowired private DynamicTask dynamicTask; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private SIPCommander cmder; @Autowired private DynamicTask dynamicTask; @Autowired private IPlayService playService; @Autowired private SIPCommander cmder; @Autowired private ISIPCommander commander; @Autowired private IPlayService playService; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired private ISIPCommander commander; @Autowired private IMediaServerService mediaServerService; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired private SIPProcessorObserver sipProcessorObserver; @Autowired private IMediaServerService mediaServerService; @Autowired private VideoStreamSessionManager sessionManager; @Autowired private SIPProcessorObserver sipProcessorObserver; @Autowired private UserSetting userSetting; @Autowired private VideoStreamSessionManager sessionManager; @Autowired private ZLMMediaListManager mediaListManager; @Autowired private UserSetting userSetting; @Autowired private ZLMMediaListManager mediaListManager; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } /** * 处理invite请求 * * @param evt * 请求消息 */ @Override public void process(RequestEvent evt) { // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 try { Request request = evt.getRequest(); SipURI sipURI = (SipURI) request.getRequestURI(); //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。 //String channelId = sipURI.getUser(); String channelId = SipUtils.getChannelIdFromHeader(request); String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); if (requesterId == null || channelId == null) { logger.info("无法从FromHeader的Address中获取到平台id,返回400"); responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误 return; } // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform == null) { inviteFromDeviceHandle(evt, requesterId); }else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); PlatformCatalog catalog = storager.getCatalog(channelId); MediaServerItem mediaServerItem = null; // 不是通道可能是直播流 if (channel != null && gbStream == null ) { if (channel.getStatus() == 0) { logger.info("通道离线,返回400"); responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); return; } responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 }else if(channel == null && gbStream != null){ String mediaServerId = gbStream.getMediaServerId(); mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE); return; } responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 }else if (catalog != null) { responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播 return; } else { logger.info("通道不存在,返回404"); responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在 return; } // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除以解析。 int ssrcIndex = contentString.indexOf("y="); // 检查是否有y字段 String ssrcDefault = "0000000000"; String ssrc; SessionDescription sdp; if (ssrcIndex >= 0) { //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); String substring = contentString.substring(0, contentString.indexOf("y=")); sdp = SdpFactory.getInstance().createSessionDescription(substring); }else { ssrc = ssrcDefault; sdp = SdpFactory.getInstance().createSessionDescription(contentString); } String sessionName = sdp.getSessionName().getValue(); Long startTime = null; Long stopTime = null; Instant start = null; Instant end = null; if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0)); TimeField startTimeFiled = (TimeField)timeDescription.getTime(); startTime = startTimeFiled.getStartTime(); stopTime = startTimeFiled.getStopTime(); start = Instant.ofEpochSecond(startTime); end = Instant.ofEpochSecond(stopTime); } // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 //String ip = null; int port = -1; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (Object description : mediaDescriptions) { MediaDescription mediaDescription = (MediaDescription) description; Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); if (mediaFormats.contains("96")) { port = media.getMediaPort(); //String mediaType = media.getMediaType(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp if ("TCP/RTP/AVP".equals(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; // 不支持tcp主动 responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 目录不支持点播 return; } else if ("passive".equals(setup)) { tcpActive = false; } } } break; } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc); Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 if (channel != null) { device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); if (device == null) { logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); responseAck(evt, Response.SERVER_INTERNAL_ERROR); return; } mediaServerItem = playService.getNewMediaServerItem(device); if (mediaServerItem == null) { logger.warn("未找到可用的zlm"); responseAck(evt, Response.BUSY_HERE); return; } SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, mediaTransmissionTCP); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlayType("Play".equals(sessionName)?InviteStreamType.PLAY:InviteStreamType.PLAYBACK); Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ String app = responseJSON.getString("app"); String stream = responseJSON.getString("stream"); logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream); // * 0 等待设备推流上来 // * 1 下级已经推流,等待上级平台回复ack // * 2 推流中 sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); content.append("s=" + sessionName+"\r\n"); content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); if ("Playback".equals(sessionName)) { content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); }else { content.append("t=0 0\r\n"); } content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y="+ ssrc + "\r\n"); content.append("f=\r\n"); try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ logger.info("Ack 等待超时"); mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc); // 回复bye cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); }, 60*1000); responseSdpAck(evt, content.toString(), platform); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } }; SipSubscribe.Event errorEvent = ((event) -> { // 未知错误。直接转发设备点播的错误 Response response = null; try { response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); ServerTransaction serverTransaction = getServerTransaction(evt); serverTransaction.sendResponse(response); if (serverTransaction.getDialog() != null) { serverTransaction.getDialog().delete(); } } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } }); sendRtpItem.setApp("rtp"); if ("Playback".equals(sessionName)) { sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true); sendRtpItem.setStreamId(ssrcInfo.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), DateUtil.formatter.format(end), null, result -> { if (result.getCode() != 0){ logger.warn("录像回放失败"); if (result.getEvent() != null) { errorEvent.response(result.getEvent()); } redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); try { responseAck(evt, Response.REQUEST_TIMEOUT); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } }else { if (result.getMediaServerItem() != null) { hookEvent.response(result.getMediaServerItem(), result.getResponse()); } } }); }else { sendRtpItem.setPlayType(InviteStreamType.PLAY); SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); if (playTransaction != null) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); if (!streamReady) { playTransaction = null; } } if (playTransaction == null) { String streamId = null; if (mediaServerItem.isRtpEnable()) { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); sendRtpItem.setStreamId(ssrcInfo.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{ logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); }, null); }else { sendRtpItem.setStreamId(playTransaction.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); JSONObject jsonObject = new JSONObject(); jsonObject.put("app", sendRtpItem.getApp()); jsonObject.put("stream", sendRtpItem.getStreamId()); hookEvent.response(mediaServerItem, jsonObject); } } }else if (gbStream != null) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (!streamReady ) { if ("proxy".equals(gbStream.getStreamType())) { // TODO 控制启用以使设备上线 logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流",gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); }else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); return; } // 发送redis消息以使设备上线 logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",gbStream.getApp(), gbStream.getStream()); MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); messageForPushChannel.setType(1); messageForPushChannel.setGbId(gbStream.getGbId()); messageForPushChannel.setApp(gbStream.getApp()); messageForPushChannel.setStream(gbStream.getStream()); // TODO 获取低负载的节点 messageForPushChannel.setMediaServerId(gbStream.getMediaServerId()); messageForPushChannel.setPlatFormId(platform.getServerGBId()); messageForPushChannel.setPlatFormName(platform.getName()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 设置超时 dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); try { mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); responseAck(evt, Response.REQUEST_TIMEOUT); // 超时 } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } }, userSetting.getPlatformPlayTimeout()); // 添加监听 MediaServerItem finalMediaServerItem = mediaServerItem; int finalPort = port; boolean finalMediaTransmissionTCP = mediaTransmissionTCP; Boolean finalTcpActive = tcpActive; mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream)->{ SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(finalMediaServerItem, addressStr, finalPort, ssrc, requesterId, app, stream, channelId, finalMediaTransmissionTCP); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); try { responseAck(evt, Response.BUSY_HERE); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } return; } if (finalTcpActive != null) { sendRtpItem.setTcpActive(finalTcpActive); } sendRtpItem.setPlayType(InviteStreamType.PUSH); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); sendStreamAck(finalMediaServerItem, sendRtpItem, platform, evt); }); } }else { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP); @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } sendRtpItem.setPlayType(InviteStreamType.PUSH); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); } @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } /** * 处理invite请求 * * @param evt 请求消息 */ @Override public void process(RequestEvent evt) { // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 try { Request request = evt.getRequest(); SipURI sipUri = (SipURI) request.getRequestURI(); //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。 //String channelId = sipURI.getUser(); String channelId = SipUtils.getChannelIdFromHeader(request); String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); if (requesterId == null || channelId == null) { logger.info("无法从FromHeader的Address中获取到平台id,返回400"); // 参数不全, 发400,请求错误 responseAck(evt, Response.BAD_REQUEST); return; } // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform == null) { inviteFromDeviceHandle(evt, requesterId); } else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); PlatformCatalog catalog = storager.getCatalog(channelId); MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; // 不是通道可能是直播流 if (channel != null && gbStream == null) { if (channel.getStatus() == 0) { logger.info("通道离线,返回400"); responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); return; } responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 } else if (channel == null && gbStream != null) { String mediaServerId = gbStream.getMediaServerId(); mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { if ("proxy".equals(gbStream.getStreamType())) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE); return; } else { streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE); return; } } } else { if ("push".equals(gbStream.getStreamType())) { streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); if (streamPushItem == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE); return; } } } responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 } else if (catalog != null) { responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播 return; } else { logger.info("通道不存在,返回404"); responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在 return; } // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除以解析。 int ssrcIndex = contentString.indexOf("y="); // 检查是否有y字段 String ssrcDefault = "0000000000"; String ssrc; SessionDescription sdp; if (ssrcIndex >= 0) { //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); String substring = contentString.substring(0, contentString.indexOf("y=")); sdp = SdpFactory.getInstance().createSessionDescription(substring); } else { ssrc = ssrcDefault; sdp = SdpFactory.getInstance().createSessionDescription(contentString); } String sessionName = sdp.getSessionName().getValue(); Long startTime = null; Long stopTime = null; Instant start = null; Instant end = null; if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0)); TimeField startTimeFiled = (TimeField) timeDescription.getTime(); startTime = startTimeFiled.getStartTime(); stopTime = startTimeFiled.getStopTime(); start = Instant.ofEpochSecond(startTime); end = Instant.ofEpochSecond(stopTime); } // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 //String ip = null; int port = -1; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (Object description : mediaDescriptions) { MediaDescription mediaDescription = (MediaDescription) description; Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); if (mediaFormats.contains("96")) { port = media.getMediaPort(); //String mediaType = media.getMediaType(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp if ("TCP/RTP/AVP".equals(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; // 不支持tcp主动 responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 目录不支持点播 return; } else if ("passive".equals(setup)) { tcpActive = false; } } } break; } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc); Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 if (channel != null) { device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); if (device == null) { logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); responseAck(evt, Response.SERVER_INTERNAL_ERROR); return; } mediaServerItem = playService.getNewMediaServerItem(device); if (mediaServerItem == null) { logger.warn("未找到可用的zlm"); responseAck(evt, Response.BUSY_HERE); return; } SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, mediaTransmissionTCP); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { String app = responseJSON.getString("app"); String stream = responseJSON.getString("stream"); logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream); // * 0 等待设备推流上来 // * 1 下级已经推流,等待上级平台回复ack // * 2 推流中 sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); content.append("s=" + sessionName + "\r\n"); content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); if ("Playback".equals(sessionName)) { content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); } else { content.append("t=0 0\r\n"); } content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y=" + ssrc + "\r\n"); content.append("f=\r\n"); try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("Ack 等待超时"); mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc); // 回复bye cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); }, 60 * 1000); responseSdpAck(evt, content.toString(), platform); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } }; SipSubscribe.Event errorEvent = ((event) -> { // 未知错误。直接转发设备点播的错误 Response response = null; try { response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); ServerTransaction serverTransaction = getServerTransaction(evt); serverTransaction.sendResponse(response); if (serverTransaction.getDialog() != null) { serverTransaction.getDialog().delete(); } } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } }); sendRtpItem.setApp("rtp"); if ("Playback".equals(sessionName)) { sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true); sendRtpItem.setStreamId(ssrcInfo.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), DateUtil.formatter.format(end), null, result -> { if (result.getCode() != 0) { logger.warn("录像回放失败"); if (result.getEvent() != null) { errorEvent.response(result.getEvent()); } redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); try { responseAck(evt, Response.REQUEST_TIMEOUT); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } else { if (result.getMediaServerItem() != null) { hookEvent.response(result.getMediaServerItem(), result.getResponse()); } } }); } else { sendRtpItem.setPlayType(InviteStreamType.PLAY); SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); if (playTransaction != null) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); if (!streamReady) { playTransaction = null; } } if (playTransaction == null) { String streamId = null; if (mediaServerItem.isRtpEnable()) { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); sendRtpItem.setStreamId(ssrcInfo.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); }, null); } else { sendRtpItem.setStreamId(playTransaction.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); JSONObject jsonObject = new JSONObject(); jsonObject.put("app", sendRtpItem.getApp()); jsonObject.put("stream", sendRtpItem.getStreamId()); hookEvent.response(mediaServerItem, jsonObject); } } } else if (gbStream != null) { if (streamPushItem.isStatus()) { // 在线状态 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } else { // 不在线 拉起 notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } } } } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); logger.warn("sdp解析错误"); e.printStackTrace(); } catch (SdpParseException e) { e.printStackTrace(); } catch (SdpException e) { e.printStackTrace(); } } /** * 安排推流 */ private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { // 推流 if (streamPushItem.getServerId().equals(userSetting.getServerId())) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } sendRtpItem.setPlayType(InviteStreamType.PUSH); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); } else { // 不在线 拉起 notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } } else { // 其他平台内容 otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } } /** * 通知流上线 */ private void notifyStreamOnline(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { if ("proxy".equals(gbStream.getStreamType())) { // TODO 控制启用以使设备上线 logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); return; } // 发送redis消息以使设备上线 logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), platform.getName(), null, gbStream.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 设置超时 dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); try { mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); responseAck(evt, Response.REQUEST_TIMEOUT); // 超时 } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } }, userSetting.getPlatformPlayTimeout()); // 添加监听 int finalPort = port; Boolean finalTcpActive = tcpActive; // 添加在本机上线的通知 mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> { dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, app, stream, channelId, mediaTransmissionTCP); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); try { responseAck(evt, Response.BUSY_HERE); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } return; } if (finalTcpActive != null) { sendRtpItem.setTcpActive(finalTcpActive); } sendRtpItem.setPlayType(InviteStreamType.PUSH); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); } else { // 其他平台内容 otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } }); } } /** * 来自其他wvp的推流 */ private void otherWvpPushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) { logger.info("[级联点播]直播流来自其他平台,发送redis消息"); // 发送redis消息 redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, channelId, mediaTransmissionTCP, null, responseSendItemMsg -> { SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { logger.warn("服务器端口资源不足"); try { responseAck(evt, Response.BUSY_HERE); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } return; } // 收到sendItem if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } sendRtpItem.setPlayType(InviteStreamType.PUSH); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); sendStreamAck(responseSendItemMsg.getMediaServerItem(), sendRtpItem, platform, evt); }, (wvpResult) -> { try { // 错误 if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { // 离线 // 查询是否在本机上线了 StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); if (currentStreamPushItem.isStatus()) { // 在线状态 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } else { // 不在线 拉起 notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } } } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } catch (SipException e) { throw new RuntimeException(e); } } try { responseAck(evt, Response.BUSY_HERE); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } return; }); } } public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); logger.warn("sdp解析错误"); e.printStackTrace(); } catch (SdpParseException e) { e.printStackTrace(); } catch (SdpException e) { e.printStackTrace(); } } StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("t=0 0\r\n"); content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); if (sendRtpItem.isTcp()) { content.append("a=connection:new\r\n"); if (!sendRtpItem.isTcpActive()) { content.append("a=setup:active\r\n"); } else { content.append("a=setup:passive\r\n"); } } content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); content.append("f=\r\n"); public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt){ try { responseSdpAck(evt, content.toString(), platform); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ sendRtpItem.getChannelId() +" 0 0 IN IP4 "+ mediaServerItem.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); if (sendRtpItem.isTcp()) { content.append("a=connection:new\r\n"); if (!sendRtpItem.isTcpActive()) { content.append("a=setup:active\r\n"); }else { content.append("a=setup:passive\r\n"); } } content.append("y="+ sendRtpItem.getSsrc() + "\r\n"); content.append("f=\r\n"); public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { try { responseSdpAck(evt, content.toString(), platform); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); Request request = evt.getRequest(); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); responseAck(evt, Response.TRYING); public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 String substring = contentString; String ssrc = "0000000404"; int ssrcIndex = contentString.indexOf("y="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); } ssrcIndex = substring.indexOf("f="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); } SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); Request request = evt.getRequest(); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); responseAck(evt, Response.TRYING); // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 int port = -1; //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (int i = 0; i < mediaDescriptions.size(); i++) { MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i); Media media = mediaDescription.getMedia(); String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 String substring = contentString; String ssrc = "0000000404"; int ssrcIndex = contentString.indexOf("y="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); } ssrcIndex = substring.indexOf("f="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); } SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); Vector mediaFormats = media.getMediaFormats(false); if (mediaFormats.contains("8")) { port = media.getMediaPort(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp if ("TCP/RTP/AVP".equals(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; } else if ("passive".equals(setup)) { tcpActive = false; } } } break; } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 int port = -1; //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (int i = 0; i < mediaDescriptions.size(); i++) { MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); if (mediaFormats.contains("8")) { port = media.getMediaPort(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp if ("TCP/RTP/AVP".equals(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; } else if ("passive".equals(setup)) { tcpActive = false; } } } break; } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); } else { logger.warn("来自无效设备/平台的请求"); responseAck(evt, Response.BAD_REQUEST); } } } else { logger.warn("来自无效设备/平台的请求"); responseAck(evt, Response.BAD_REQUEST); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
@@ -41,7 +41,7 @@ private final Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class); public String method = "REGISTER"; public final String method = "REGISTER"; @Autowired private SipConfig sipConfig; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -40,9 +40,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class); public static volatile List<String> threadNameList = new ArrayList(); private final String cmdType = "RecordInfo"; private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java
@@ -17,7 +17,7 @@ @Component public class ByeResponseProcessor extends SIPResponseProcessorAbstract { private String method = "BYE"; private final String method = "BYE"; @Autowired private SipLayer sipLayer; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java
@@ -17,7 +17,7 @@ @Component public class CancelResponseProcessor extends SIPResponseProcessorAbstract { private String method = "CANCEL"; private final String method = "CANCEL"; @Autowired private SipLayer sipLayer; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
@@ -31,7 +31,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class); private String method = "INVITE"; private final String method = "INVITE"; @Autowired private SipLayer sipLayer; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -27,7 +27,7 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class); private String method = "REGISTER"; private final String method = "REGISTER"; @Autowired private ISIPCommanderForPlatform sipCommanderForPlatform; src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -397,21 +397,22 @@ if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { streamPushItem = zlmMediaListManager.addPush(item); item.setSeverId(userSetting.getServerId()); zlmMediaListManager.addPush(item); } List<GbStream> gbStreams = new ArrayList<>(); if (streamPushItem == null || streamPushItem.getGbId() == null) { GbStream gbStream = storager.getGbStream(app, streamId); gbStreams.add(gbStream); }else { if (streamPushItem.getGbId() != null) { gbStreams.add(streamPushItem); } } if (gbStreams.size() > 0) { // List<GbStream> gbStreams = new ArrayList<>(); // if (streamPushItem == null || streamPushItem.getGbId() == null) { // GbStream gbStream = storager.getGbStream(app, streamId); // gbStreams.add(gbStream); // }else { // if (streamPushItem.getGbId() != null) { // gbStreams.add(streamPushItem); // } // } // if (gbStreams.size() > 0) { // eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON); } // } }else { // 兼容流注销时类型从redis记录获取 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -24,6 +24,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; /** * @author lin */ @Component public class ZLMMediaListManager { @@ -147,7 +150,6 @@ } } } // StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(transform.getApp(), transform.getStream()); List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId()); if (gbStreamList != null && gbStreamList.size() == 1) { transform.setGbStreamId(gbStreamList.get(0).getGbStreamId()); @@ -162,12 +164,11 @@ } if (transform != null) { if (channelOnlineEvents.get(transform.getGbId()) != null) { channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream()); channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId()); channelOnlineEvents.remove(transform.getGbId()); } } } storager.updateMedia(transform); return transform; src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -2,6 +2,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; @@ -19,6 +20,9 @@ @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @Autowired private UserSetting userSetting; private int[] portRangeArray = new int[2]; @@ -197,6 +201,7 @@ sendRtpItem.setTcp(tcp); sendRtpItem.setApp("rtp"); sendRtpItem.setLocalPort(localPort); sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setMediaServerId(serverItem.getId()); return sendRtpItem; } @@ -238,6 +243,7 @@ sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(tcp); sendRtpItem.setLocalPort(localPort); sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setMediaServerId(serverItem.getId()); return sendRtpItem; } src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
@@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.media.zlm.dto; /** * @author lin */ public interface ChannelOnlineEvent { void run(String app, String stream); void run(String app, String stream, String serverId); } src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java
@@ -61,9 +61,14 @@ private String originUrl; /** * 服务器id * 流媒体服务器id */ private String mediaServerId; /** * 服务器id */ private String severId; /** * GMT unix系统时间戳,单位秒 @@ -414,4 +419,12 @@ public void setStreamInfo(StreamInfo streamInfo) { this.streamInfo = streamInfo; } public String getSeverId() { return severId; } public void setSeverId(String severId) { this.severId = severId; } } src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java
@@ -81,6 +81,11 @@ */ private String mediaServerId; /** * 使用的服务ID */ private String serverId; public String getVhost() { return vhost; } @@ -219,5 +224,13 @@ public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } public String getServerId() { return serverId; } public void setServerId(String serverId) { this.serverId = serverId; } } src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java
@@ -23,7 +23,6 @@ private IVideoManagerStorage storager; @Scheduled(fixedRate = 30 * 1000) //每30秒执行一次 public void execute(){ List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -1,7 +1,10 @@ package com.genersoft.iot.vmp.service.bean; import java.util.stream.Stream; /** * 当上级平台 * @author lin */ public class MessageForPushChannel { /** @@ -45,6 +48,20 @@ */ private String mediaServerId; public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId, String platFormId, String platFormName, String serverId, String mediaServerId){ MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); messageForPushChannel.setType(type); messageForPushChannel.setGbId(gbId); messageForPushChannel.setApp(app); messageForPushChannel.setStream(stream); messageForPushChannel.setMediaServerId(mediaServerId); messageForPushChannel.setPlatFormId(platFormId); messageForPushChannel.setPlatFormName(platFormName); return messageForPushChannel; } public int getType() { return type; src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java
New file @@ -0,0 +1,170 @@ package com.genersoft.iot.vmp.service.bean; /** * redis消息:请求下级推送流信息 * @author lin */ public class RequestPushStreamMsg { /** * 下级服务ID */ private String mediaServerId; /** * 流ID */ private String app; /** * 应用名 */ private String stream; /** * 目标IP */ private String ip; /** * 目标端口 */ private int port; /** * ssrc */ private String ssrc; /** * 是否使用TCP方式 */ private boolean tcp; /** * 本地使用的端口 */ private int srcPort; /** * 发送时,rtp的pt(uint8_t),不传时默认为96 */ private int pt; /** * 发送时,rtp的负载类型。为true时,负载为ps;为false时,为es; */ private boolean ps; /** * 是否只有音频 */ private boolean onlyAudio; public static RequestPushStreamMsg getInstance(String mediaServerId, String app, String stream, String ip, int port, String ssrc, boolean tcp, int srcPort, int pt, boolean ps, boolean onlyAudio) { RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg(); requestPushStreamMsg.setMediaServerId(mediaServerId); requestPushStreamMsg.setApp(app); requestPushStreamMsg.setStream(stream); requestPushStreamMsg.setIp(ip); requestPushStreamMsg.setPort(port); requestPushStreamMsg.setSsrc(ssrc); requestPushStreamMsg.setTcp(tcp); requestPushStreamMsg.setSrcPort(srcPort); requestPushStreamMsg.setPt(pt); requestPushStreamMsg.setPs(ps); requestPushStreamMsg.setOnlyAudio(onlyAudio); return requestPushStreamMsg; } public String getMediaServerId() { return mediaServerId; } public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } public String getApp() { return app; } public void setApp(String app) { this.app = app; } public String getStream() { return stream; } public void setStream(String stream) { this.stream = stream; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getSsrc() { return ssrc; } public void setSsrc(String ssrc) { this.ssrc = ssrc; } public boolean isTcp() { return tcp; } public void setTcp(boolean tcp) { this.tcp = tcp; } public int getSrcPort() { return srcPort; } public void setSrcPort(int srcPort) { this.srcPort = srcPort; } public int getPt() { return pt; } public void setPt(int pt) { this.pt = pt; } public boolean isPs() { return ps; } public void setPs(boolean ps) { this.ps = ps; } public boolean isOnlyAudio() { return onlyAudio; } public void setOnlyAudio(boolean onlyAudio) { this.onlyAudio = onlyAudio; } } src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java
New file @@ -0,0 +1,173 @@ package com.genersoft.iot.vmp.service.bean; /** * redis消息:请求下级回复推送信息 * @author lin */ public class RequestSendItemMsg { /** * 下级服务ID */ private String serverId; /** * 下级服务ID */ private String mediaServerId; /** * 流ID */ private String app; /** * 应用名 */ private String stream; /** * 目标IP */ private String ip; /** * 目标端口 */ private int port; /** * ssrc */ private String ssrc; /** * 平台国标编号 */ private String platformId; /** * 平台名称 */ private String platformName; /** * 通道ID */ private String channelId; /** * 是否使用TCP */ private Boolean isTcp; public static RequestSendItemMsg getInstance(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, String platformId, String channelId, Boolean isTcp, String platformName) { RequestSendItemMsg requestSendItemMsg = new RequestSendItemMsg(); requestSendItemMsg.setServerId(serverId); requestSendItemMsg.setMediaServerId(mediaServerId); requestSendItemMsg.setApp(app); requestSendItemMsg.setStream(stream); requestSendItemMsg.setIp(ip); requestSendItemMsg.setPort(port); requestSendItemMsg.setSsrc(ssrc); requestSendItemMsg.setPlatformId(platformId); requestSendItemMsg.setPlatformName(platformName); requestSendItemMsg.setChannelId(channelId); requestSendItemMsg.setTcp(isTcp); return requestSendItemMsg; } public String getServerId() { return serverId; } public void setServerId(String serverId) { this.serverId = serverId; } public String getMediaServerId() { return mediaServerId; } public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } public String getApp() { return app; } public void setApp(String app) { this.app = app; } public String getStream() { return stream; } public void setStream(String stream) { this.stream = stream; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getSsrc() { return ssrc; } public void setSsrc(String ssrc) { this.ssrc = ssrc; } public String getPlatformId() { return platformId; } public void setPlatformId(String platformId) { this.platformId = platformId; } public String getPlatformName() { return platformName; } public void setPlatformName(String platformName) { this.platformName = platformName; } public String getChannelId() { return channelId; } public void setChannelId(String channelId) { this.channelId = channelId; } public Boolean getTcp() { return isTcp; } public void setTcp(Boolean tcp) { isTcp = tcp; } } src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java
New file @@ -0,0 +1,31 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; /** * redis消息:下级回复推送信息 * @author lin */ public class ResponseSendItemMsg { private SendRtpItem sendRtpItem; private MediaServerItem mediaServerItem; public SendRtpItem getSendRtpItem() { return sendRtpItem; } public void setSendRtpItem(SendRtpItem sendRtpItem) { this.sendRtpItem = sendRtpItem; } public MediaServerItem getMediaServerItem() { return mediaServerItem; } public void setMediaServerItem(MediaServerItem mediaServerItem) { this.mediaServerItem = mediaServerItem; } } src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java
New file @@ -0,0 +1,116 @@ package com.genersoft.iot.vmp.service.bean; /** * @author lin */ public class WvpRedisMsg { public static WvpRedisMsg getInstance(String fromId, String toId, String type, String cmd, String serial, String content){ WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); wvpRedisMsg.setFromId(fromId); wvpRedisMsg.setToId(toId); wvpRedisMsg.setType(type); wvpRedisMsg.setCmd(cmd); wvpRedisMsg.setSerial(serial); wvpRedisMsg.setContent(content); return wvpRedisMsg; } private String fromId; private String toId; /** * req 请求, res 回复 */ private String type; private String cmd; /** * 消息的ID */ private String serial; private Object content; private final static String requestTag = "req"; private final static String responseTag = "res"; public static WvpRedisMsg getRequestInstance(String fromId, String toId, String cmd, String serial, Object content) { WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); wvpRedisMsg.setType(requestTag); wvpRedisMsg.setFromId(fromId); wvpRedisMsg.setToId(toId); wvpRedisMsg.setCmd(cmd); wvpRedisMsg.setSerial(serial); wvpRedisMsg.setContent(content); return wvpRedisMsg; } public static WvpRedisMsg getResponseInstance() { WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); wvpRedisMsg.setType(responseTag); return wvpRedisMsg; } public static WvpRedisMsg getResponseInstance(String fromId, String toId, String cmd, String serial, Object content) { WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); wvpRedisMsg.setType(responseTag); wvpRedisMsg.setFromId(fromId); wvpRedisMsg.setToId(toId); wvpRedisMsg.setCmd(cmd); wvpRedisMsg.setSerial(serial); wvpRedisMsg.setContent(content); return wvpRedisMsg; } public static boolean isRequest(WvpRedisMsg wvpRedisMsg) { return requestTag.equals(wvpRedisMsg.getType()); } public String getSerial() { return serial; } public void setSerial(String serial) { this.serial = serial; } public String getFromId() { return fromId; } public void setFromId(String fromId) { this.fromId = fromId; } public String getToId() { return toId; } public void setToId(String toId) { this.toId = toId; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getCmd() { return cmd; } public void setCmd(String cmd) { this.cmd = cmd; } public Object getContent() { return content; } public void setContent(Object content) { this.content = content; } } src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
New file @@ -0,0 +1,12 @@ package com.genersoft.iot.vmp.service.bean; /** * @author lin */ public class WvpRedisMsgCmd { public static final String GET_SEND_ITEM = "GetSendItem"; public static final String REQUEST_PUSH_STREAM = "RequestPushStream"; } src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
New file @@ -0,0 +1,377 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * 监听下级发送推送信息,并发送国标推流消息上级 * @author lin */ @Component public class RedisGbPlayMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class); public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM"; /** * 流媒体不存在的错误玛 */ public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1; /** * 离线的错误玛 */ public static final int ERROR_CODE_OFFLINE = -2; /** * 超时的错误玛 */ public static final int ERROR_CODE_TIMEOUT = -3; private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); @Autowired private UserSetting userSetting; @Autowired private RedisUtil redis; @Autowired private ZLMMediaListManager zlmMediaListManager; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired private IMediaServerService mediaServerService; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private DynamicTask dynamicTask; @Autowired private ZLMMediaListManager mediaListManager; @Autowired private ZLMHttpHookSubscribe subscribe; public interface PlayMsgCallback{ void handler(ResponseSendItemMsg responseSendItemMsg); } public interface PlayMsgCallbackForStartSendRtpStream{ void handler(JSONObject jsonObject); } public interface PlayMsgErrorCallback{ void handler(WVPResult wvpResult); } @Override public void onMessage(Message message, byte[] bytes) { JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class); WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { return; } if (WvpRedisMsg.isRequest(wvpRedisMsg)) { logger.info("[收到REDIS通知] 请求: {}", new String(message.getBody())); switch (wvpRedisMsg.getCmd()){ case WvpRedisMsgCmd.GET_SEND_ITEM: RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; } }else { logger.info("[收到REDIS通知] 回复: {}", new String(message.getBody())); switch (wvpRedisMsg.getCmd()){ case WvpRedisMsgCmd.GET_SEND_ITEM: WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); String key = wvpRedisMsg.getSerial(); switch (content.getCode()) { case 0: ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); PlayMsgCallback playMsgCallback = callbacks.get(key); if (playMsgCallback != null) { callbacksForError.remove(key); playMsgCallback.handler(responseSendItemMsg); } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: case ERROR_CODE_OFFLINE: case ERROR_CODE_TIMEOUT: PlayMsgErrorCallback errorCallback = callbacksForError.get(key); if (errorCallback != null) { callbacks.remove(key); errorCallback.handler(content); } break; default: break; } break; case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); String serial = wvpRedisMsg.getSerial(); switch (wvpResult.getCode()) { case 0: JSONObject jsonObject = (JSONObject)wvpResult.getData(); PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); if (playMsgCallback != null) { callbacksForError.remove(serial); playMsgCallback.handler(jsonObject); } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: case ERROR_CODE_OFFLINE: case ERROR_CODE_TIMEOUT: PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); if (errorCallback != null) { callbacks.remove(serial); errorCallback.handler(wvpResult); } break; default: break; } break; default: break; } } } /** * 处理收到的请求推流的请求 */ private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); if (mediaInfo == null) { // TODO 回复错误 return; } String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; Map<String, Object> param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",requestPushStreamMsg.getApp()); param.put("stream",requestPushStreamMsg.getStream()); param.put("ssrc", requestPushStreamMsg.getSsrc()); param.put("dst_url",requestPushStreamMsg.getIp()); param.put("dst_port", requestPushStreamMsg.getPort()); param.put("is_udp", is_Udp); param.put("src_port", requestPushStreamMsg.getSrcPort()); param.put("pt", requestPushStreamMsg.getPt()); param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); // 回复消息 responsePushStream(jsonObject, fromId, serial); } private void responsePushStream(JSONObject content, String toId, String serial) { WVPResult<JSONObject> result = new WVPResult<>(); result.setCode(0); result.setData(content); WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** * 处理收到的请求sendItem的请求 */ private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); if (mediaServerItem == null) { logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId()); WVPResult<SendRtpItem> result = new WVPResult<>(); result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND); result.setMsg("流媒体不存在"); WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); return; } // 确定流是否在线 boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); if (streamReady) { logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream()); responseSendItem(mediaServerItem, content, toId, serial); }else { // 流已经离线 // 发送redis消息以使设备上线 logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",content.getApp(), content.getStream()); String taskKey = UUID.randomUUID().toString(); // 设置超时 dynamicTask.startDelay(taskKey, ()->{ logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream()); WVPResult<SendRtpItem> result = new WVPResult<>(); result.setCode(ERROR_CODE_TIMEOUT); WvpRedisMsg response = WvpRedisMsg.getResponseInstance( userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result ); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); }, userSetting.getPlatformPlayTimeout()); // 添加订阅 JSONObject subscribeKey = new JSONObject(); subscribeKey.put("app", content.getApp()); subscribeKey.put("stream", content.getStream()); subscribeKey.put("regist", true); subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), content.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); } } /** * 将获取到的sendItem发送出去 */ private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), content.getTcp()); WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); result.setCode(0); ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg(); responseSendItemMsg.setSendRtpItem(sendRtpItem); responseSendItemMsg.setMediaServerItem(mediaServerItem); result.setData(responseSendItemMsg); WvpRedisMsg response = WvpRedisMsg.getResponseInstance( userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result ); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** * 发送消息要求下级生成推流信息 * @param serverId 下级服务ID * @param app 应用名 * @param stream 流ID * @param ip 目标IP * @param port 目标端口 * @param ssrc ssrc * @param platformId 平台国标编号 * @param channelId 通道ID * @param isTcp 是否使用TCP * @param callback 得到信息的回调 */ public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName); requestSendItemMsg.setServerId(serverId); String key = UUID.randomUUID().toString(); WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, key, requestSendItemMsg); JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject); callbacks.put(key, callback); callbacksForError.put(key, errorCallback); dynamicTask.startDelay(key, ()->{ callbacks.remove(key); callbacksForError.remove(key); WVPResult<Object> wvpResult = new WVPResult<>(); wvpResult.setCode(ERROR_CODE_TIMEOUT); wvpResult.setMsg("timeout"); errorCallback.handler(wvpResult); }, userSetting.getPlatformPlayTimeout()); redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** * 发送请求推流的消息 * @param param 推流参数 * @param callback 回调 */ public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { String key = UUID.randomUUID().toString(); WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param); JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject); dynamicTask.startDelay(key, ()->{ callbacksForStartSendRtpStream.remove(key); callbacksForError.remove(key); }, userSetting.getPlatformPlayTimeout()); callbacksForStartSendRtpStream.put(key, callback); callbacksForError.put(key, (wvpResult)->{ logger.info("[REDIS 请求其他平台推流] 失败: {}", wvpResult.getMsg()); callbacksForStartSendRtpStream.remove(key); callbacksForError.remove(key); }); redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } } src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
File was renamed from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java @@ -3,6 +3,7 @@ import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -10,17 +11,23 @@ import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; /** * 接收来自redis的GPS更新通知 * @author lin */ @Component public class RedisGPSMsgListener implements MessageListener { public class RedisGpsMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisGPSMsgListener.class); private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); @Autowired private IRedisCatchStorage redisCatchStorage; @Override public void onMessage(Message message, byte[] bytes) { logger.info("收到来自REDIS的GPS通知: {}", new String(message.getBody())); public void onMessage(@NotNull Message message, byte[] bytes) { if (logger.isDebugEnabled()) { logger.debug("收到来自REDIS的GPS通知: {}", new String(message.getBody())); } GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
New file @@ -0,0 +1,83 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; /** * @author lin */ @Component public class RedisStreamMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); @Autowired private ISIPCommander commander; @Autowired private ISIPCommanderForPlatform commanderForPlatform; @Autowired private IVideoManagerStorage storage; @Autowired private UserSetting userSetting; @Autowired private ZLMMediaListManager zlmMediaListManager; @Override public void onMessage(Message message, byte[] bytes) { JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class); if (steamMsgJson == null) { logger.warn("[REDIS的ALARM通知]消息解析失败"); return; } String serverId = steamMsgJson.getString("serverId"); if (userSetting.getServerId().equals(serverId)) { // 自己发送的消息忽略即可 return; } logger.info("[REDIS通知] 流变化: {}", new String(message.getBody())); String app = steamMsgJson.getString("app"); String stream = steamMsgJson.getString("stream"); boolean register = steamMsgJson.getBoolean("register"); String mediaServerId = steamMsgJson.getString("mediaServerId"); MediaItem mediaItem = new MediaItem(); mediaItem.setSeverId(serverId); mediaItem.setApp(app); mediaItem.setStream(stream); mediaItem.setRegist(register); mediaItem.setMediaServerId(mediaServerId); mediaItem.setCreateStamp(System.currentTimeMillis()/1000); mediaItem.setAliveSecond(0L); mediaItem.setTotalReaderCount("0"); mediaItem.setOriginType(0); mediaItem.setOriginTypeStr("0"); mediaItem.setOriginTypeStr("unknown"); zlmMediaListManager.addPush(mediaItem); } } src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -107,6 +107,7 @@ streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); streamPushItem.setVhost(item.getVhost()); streamPushItem.setServerId(item.getSeverId()); return streamPushItem; } src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
@@ -357,6 +357,15 @@ /** * 获取但个推流 * @param app * @param stream * @return */ StreamPushItem getMedia(String app, String stream); /** * 清空推流列表 */ void clearMediaList(); src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -14,9 +14,9 @@ public interface StreamPushMapper { @Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " + "createStamp, aliveSecond, mediaServerId) VALUES" + "createStamp, aliveSecond, mediaServerId, serverId) VALUES" + "('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " + "'${createStamp}', '${aliveSecond}', '${mediaServerId}' )") "'${createStamp}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' )") int add(StreamPushItem streamPushItem); @Update("UPDATE stream_push " + src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -587,11 +587,11 @@ String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_*"; List<GPSMsgInfo> result = new ArrayList<>(); List<Object> keys = redis.scan(scanKey); for (int i = 0; i < keys.size(); i++) { String key = (String) keys.get(i); for (Object o : keys) { String key = (String) o; GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key); if (!gpsMsgInfo.isStored()) { // 只取没有存过得 result.add((GPSMsgInfo)redis.get(key)); result.add((GPSMsgInfo) redis.get(key)); } } @@ -667,7 +667,7 @@ @Override public void sendStreamPushRequestedMsg(MessageForPushChannel msg) { String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; logger.info("[redis 推流被请求通知] {}: {}-{}", key, msg.getApp(), msg.getStream()); logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream()); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); } src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -885,6 +885,11 @@ } @Override public StreamPushItem getMedia(String app, String stream) { return streamPushMapper.selectOne(app, stream); } @Override public void clearMediaList() { streamPushMapper.clear(); } web_src/src/components/dialog/rtcPlayer.vue
@@ -7,11 +7,11 @@ </template> <script> let webrtcPlayer = null; export default { name: 'rtcPlayer', data() { return { webrtcPlayer: null, timer: null }; }, @@ -35,7 +35,7 @@ }, methods: { play: function (url) { this.webrtcPlayer = new ZLMRTCClient.Endpoint({ webrtcPlayer = new ZLMRTCClient.Endpoint({ element: document.getElementById('webRtcPlayerBox'),// video 标签 debug: true,// 是否打印日志 zlmsdpUrl: url,//流地址 @@ -45,17 +45,17 @@ videoEnable: false, recvOnly: true, }) this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错 webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错 console.error('ICE 协商出错') this.eventcallbacK("ICE ERROR", "ICE 协商出错") }); this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放 webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放 console.error('播放成功',e.streams) this.eventcallbacK("playing", "播放成功") }); this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败 webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败 console.error('offer anwser 交换失败',e) this.eventcallbacK("OFFER ANSWER ERROR ", "offer anwser 交换失败") if (e.code ==-400 && e.msg=="流不存在"){ @@ -68,7 +68,7 @@ } }); this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 获取到了本地流 webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 获取到了本地流 // document.getElementById('selfVideo').srcObject=s; this.eventcallbacK("LOCAL STREAM", "获取到了本地流") @@ -76,9 +76,9 @@ }, pause: function () { if (this.webrtcPlayer != null) { this.webrtcPlayer.close(); this.webrtcPlayer = null; if (webrtcPlayer != null) { webrtcPlayer.close(); webrtcPlayer = null; } },