package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.RequestEvent; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import java.util.HashMap; import java.util.Map; import java.util.Timer; import java.util.TimerTask; /** * SIP命令类型: ACK请求 */ @Component public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); private String method = "ACK"; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired private IMediaServerService mediaServerService; @Autowired private ZLMHttpHookSubscribe subscribe; @Autowired private DynamicTask dynamicTask; /** * 处理 ACK请求 * * @param evt */ @Override public void process(RequestEvent evt) { Dialog dialog = evt.getDialog(); CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); logger.info("ACK请求: platformGbId->{}", platformGbId); // 取消设置的超时任务 dynamicTask.stop(callIdHeader.getCallId()); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = null; if (sendRtpItem.isPlay()) { streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); }else { streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); } if (streamInfo == null) { streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStream(sendRtpItem.getStreamId()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",streamInfo.getApp()); param.put("stream",streamInfo.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); if (jsonObject.getInteger("code") != 0) { logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream()); // 监听流上线 // 添加订阅 JSONObject subscribeKey = new JSONObject(); subscribeKey.put("app", "rtp"); subscribeKey.put("stream", streamInfo.getStream()); subscribeKey.put("regist", true); subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); }); } } } }