648540858
2022-03-04 b10a65483d709838fbe7d871b83fd5d42f2ef37e
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -3,13 +3,15 @@
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -23,13 +25,11 @@
import java.util.HashMap;
import java.util.Map;
/**
 * @description:ACK请求处理器
 * @author: swwheihei
 * @date:   2020年5月3日 下午5:31:45
/**
 * SIP命令类型: ACK请求
 */
@Component
public class AckRequestProcessor extends SIPRequestProcessorAbstract {
public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
   private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
   private String method = "ACK";
@@ -72,10 +72,10 @@
         if (deviceId == null) {
            streamInfo = new StreamInfo();
            streamInfo.setApp(sendRtpItem.getApp());
            streamInfo.setStreamId(sendRtpItem.getStreamId());
            streamInfo.setStream(sendRtpItem.getStreamId());
         }else {
            streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
            sendRtpItem.setStreamId(streamInfo.getStreamId());
            sendRtpItem.setStreamId(streamInfo.getStream());
            streamInfo.setApp("rtp");
         }
@@ -85,7 +85,7 @@
         Map<String, Object> param = new HashMap<>();
         param.put("vhost","__defaultVhost__");
         param.put("app",streamInfo.getApp());
         param.put("stream",streamInfo.getStreamId());
         param.put("stream",streamInfo.getStream());
         param.put("ssrc", sendRtpItem.getSsrc());
         param.put("dst_url",sendRtpItem.getIp());
         param.put("dst_port", sendRtpItem.getPort());
@@ -98,21 +98,21 @@
            try {
               if (System.currentTimeMillis() - startTime < 30 * 1000) {
                  MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                  if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) {
                  if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStream())) {
                     rtpPushed = true;
                     logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
                           streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
                           streamInfo.getApp() ,streamInfo.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
                     zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                  } else {
                     logger.info("等待设备推流[{}/{}].......",
                           streamInfo.getApp() ,streamInfo.getStreamId());
                           streamInfo.getApp() ,streamInfo.getStream());
                     Thread.sleep(1000);
                     continue;
                  }
               } else {
                  rtpPushed = true;
                  logger.info("设备推流[{}/{}]超时,终止向上级推流",
                        streamInfo.getApp() ,streamInfo.getStreamId());
                        streamInfo.getApp() ,streamInfo.getStream());
               }
            } catch (InterruptedException e) {
               e.printStackTrace();