| | |
| | | package com.genersoft.iot.vmp.service.redisMsg; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | |
| | | /** |
| | |
| | | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | | private ZLMServerFactory zlmServerFactory; |
| | | |
| | | @Autowired |
| | | private ZlmHttpHookSubscribe hookSubscribe; |
| | | private IMediaServerService mediaServerService; |
| | | |
| | | @Qualifier("taskExecutor") |
| | | @Autowired |
| | |
| | | while (!taskQueue.isEmpty()) { |
| | | Message msg = taskQueue.poll(); |
| | | try { |
| | | MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); |
| | | if (messageForPushChannel == null |
| | | || ObjectUtils.isEmpty(messageForPushChannel.getApp()) |
| | | || ObjectUtils.isEmpty(messageForPushChannel.getStream()) |
| | | || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ |
| | | continue; |
| | | SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); |
| | | sendRtpItem.getMediaServerId(); |
| | | MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | if (mediaServer == null) { |
| | | return; |
| | | } |
| | | |
| | | // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 |
| | | HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( |
| | | messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", |
| | | null); |
| | | hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { |
| | | // 读取redis中的上级点播信息,生成sendRtpItm发送出去 |
| | | |
| | | }); |
| | | |
| | | Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem); |
| | | sendRtp(sendRtpItem, mediaServer, sendRtpParam); |
| | | |
| | | }catch (Exception e) { |
| | | logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); |
| | |
| | | }); |
| | | } |
| | | } |
| | | |
| | | private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) { |
| | | String isUdp = sendRtpItem.isTcp() ? "0" : "1"; |
| | | Map<String, Object> param = new HashMap<>(12); |
| | | param.put("vhost","__defaultVhost__"); |
| | | param.put("app",sendRtpItem.getApp()); |
| | | param.put("stream",sendRtpItem.getStream()); |
| | | param.put("ssrc", sendRtpItem.getSsrc()); |
| | | param.put("dst_url",sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | param.put("src_port", sendRtpItem.getLocalPort()); |
| | | param.put("pt", sendRtpItem.getPt()); |
| | | param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); |
| | | param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); |
| | | param.put("is_udp", isUdp); |
| | | if (!sendRtpItem.isTcp()) { |
| | | // udp模式下开启rtcp保活 |
| | | param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); |
| | | } |
| | | return param; |
| | | } |
| | | |
| | | private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){ |
| | | JSONObject startSendRtpStreamResult = null; |
| | | if (sendRtpItem.getLocalPort() != 0) { |
| | | if (sendRtpItem.isTcpActive()) { |
| | | startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); |
| | | }else { |
| | | param.put("dst_url", sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); |
| | | } |
| | | }else { |
| | | if (sendRtpItem.isTcpActive()) { |
| | | startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); |
| | | }else { |
| | | param.put("dst_url", sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); |
| | | } |
| | | } |
| | | return startSendRtpStreamResult; |
| | | |
| | | } |
| | | } |