From 6ac131bcf6ec034ffc9e5813c68c56c6e5c32b70 Mon Sep 17 00:00:00 2001 From: xiaoxie <hotcoffie@163.com> Date: 星期三, 01 十二月 2021 22:45:51 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/wvp-28181-2.0' into wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 265 ++++++++++++++++++++++++---------------------------- 1 files changed, 123 insertions(+), 142 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java similarity index 68% rename from src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 7d9e5f7..127ef29 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -1,142 +1,123 @@ -package com.genersoft.iot.vmp.gb28181.transmit.request.impl; - -import java.util.HashMap; -import java.util.Map; - -import javax.sip.*; -import javax.sip.address.SipURI; -import javax.sip.header.FromHeader; -import javax.sip.header.HeaderAddress; -import javax.sip.header.ToHeader; - -import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @Description:ACK璇锋眰澶勭悊鍣� - * @author: swwheihei - * @date: 2020骞�5鏈�3鏃� 涓嬪崍5:31:45 - */ -public class AckRequestProcessor extends SIPRequestAbstractProcessor { - - private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); - - private IRedisCatchStorage redisCatchStorage; - - private ZLMRTPServerFactory zlmrtpServerFactory; - - private IMediaServerService mediaServerService; - - /** - * 澶勭悊 ACK璇锋眰 - * - * @param evt - */ - @Override - public void process(RequestEvent evt) { - //Request request = evt.getRequest(); - Dialog dialog = evt.getDialog(); - if (dialog == null) return; - //DialogState state = dialog.getState(); - if (/*request.getMecodewwthod().equals(Request.INVITE) &&*/ dialog.getState()== DialogState.CONFIRMED) { - String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - String deviceId = sendRtpItem.getDeviceId(); - StreamInfo streamInfo = null; - if (deviceId == null) { - streamInfo = new StreamInfo(); - streamInfo.setApp(sendRtpItem.getApp()); - streamInfo.setStreamId(sendRtpItem.getStreamId()); - }else { - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - sendRtpItem.setStreamId(streamInfo.getStreamId()); - streamInfo.setApp("rtp"); - } - - redisCatchStorage.updateSendRTPSever(sendRtpItem); - logger.info(platformGbId); - logger.info(channelId); - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",streamInfo.getApp()); - param.put("stream",streamInfo.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("dst_url",sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - param.put("is_udp", is_Udp); - //param.put ("src_port", sendRtpItem.getLocalPort()); - // 璁惧鎺ㄦ祦鏌ヨ锛屾垚鍔熷悗鎵嶈兘杞帹 - boolean rtpPushed = false; - long startTime = System.currentTimeMillis(); - while (!rtpPushed) { - try { - if (System.currentTimeMillis() - startTime < 30 * 1000) { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { - rtpPushed = true; - logger.info("宸茶幏鍙栬澶囨帹娴乕{}/{}]锛屽紑濮嬪悜涓婄骇鎺ㄦ祦[{}:{}]", - streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } else { - logger.info("绛夊緟璁惧鎺ㄦ祦[{}/{}].......", - streamInfo.getApp() ,streamInfo.getStreamId()); - Thread.sleep(1000); - continue; - } - } else { - rtpPushed = true; - logger.info("璁惧鎺ㄦ祦[{}/{}]瓒呮椂锛岀粓姝㈠悜涓婄骇鎺ㄦ祦", - streamInfo.getApp() ,streamInfo.getStreamId()); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - // try { - // Request ackRequest = null; - // CSeq csReq = (CSeq) request.getHeader(CSeq.NAME); - // ackRequest = dialog.createAck(csReq.getSeqNumber()); - // dialog.sendAck(ackRequest); - // logger.info("send ack to callee:" + ackRequest.toString()); - // } catch (SipException e) { - // e.printStackTrace(); - // } catch (InvalidArgumentException e) { - // e.printStackTrace(); - // } - - } - - public IRedisCatchStorage getRedisCatchStorage() { - return redisCatchStorage; - } - - public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { - this.redisCatchStorage = redisCatchStorage; - } - - public ZLMRTPServerFactory getZlmrtpServerFactory() { - return zlmrtpServerFactory; - } - - public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { - this.zlmrtpServerFactory = zlmrtpServerFactory; - } - - public IMediaServerService getMediaServerService() { - return mediaServerService; - } - - public void setMediaServerService(IMediaServerService mediaServerService) { - this.mediaServerService = mediaServerService; - } -} +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.Dialog; +import javax.sip.DialogState; +import javax.sip.RequestEvent; +import javax.sip.address.SipURI; +import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; +import java.util.HashMap; +import java.util.Map; + +/** + * SIP鍛戒护绫诲瀷锛� ACK璇锋眰 + */ +@Component +public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { + + private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); + private String method = "ACK"; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 娣诲姞娑堟伅澶勭悊鐨勮闃� + sipProcessorObserver.addRequestProcessor(method, this); + } + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + + /** + * 澶勭悊 ACK璇锋眰 + * + * @param evt + */ + @Override + public void process(RequestEvent evt) { + Dialog dialog = evt.getDialog(); + if (dialog == null) return; + if (dialog.getState()== DialogState.CONFIRMED) { + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + String deviceId = sendRtpItem.getDeviceId(); + StreamInfo streamInfo = null; + if (deviceId == null) { + streamInfo = new StreamInfo(); + streamInfo.setApp(sendRtpItem.getApp()); + streamInfo.setStreamId(sendRtpItem.getStreamId()); + }else { + streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + sendRtpItem.setStreamId(streamInfo.getStreamId()); + streamInfo.setApp("rtp"); + } + + redisCatchStorage.updateSendRTPSever(sendRtpItem); + logger.info(platformGbId); + logger.info(channelId); + Map<String, Object> param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",streamInfo.getApp()); + param.put("stream",streamInfo.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + param.put("is_udp", is_Udp); + //param.put ("src_port", sendRtpItem.getLocalPort()); + // 璁惧鎺ㄦ祦鏌ヨ锛屾垚鍔熷悗鎵嶈兘杞帹 + boolean rtpPushed = false; + long startTime = System.currentTimeMillis(); + while (!rtpPushed) { + try { + if (System.currentTimeMillis() - startTime < 30 * 1000) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { + rtpPushed = true; + logger.info("宸茶幏鍙栬澶囨帹娴乕{}/{}]锛屽紑濮嬪悜涓婄骇鎺ㄦ祦[{}:{}]", + streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } else { + logger.info("绛夊緟璁惧鎺ㄦ祦[{}/{}].......", + streamInfo.getApp() ,streamInfo.getStreamId()); + Thread.sleep(1000); + continue; + } + } else { + rtpPushed = true; + logger.info("璁惧鎺ㄦ祦[{}/{}]瓒呮椂锛岀粓姝㈠悜涓婄骇鎺ㄦ祦", + streamInfo.getApp() ,streamInfo.getStreamId()); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } +} -- Gitblit v1.8.0