648540858
2022-05-06 5d901b5e3f033e8b04e53420d68626cbd87431c8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -1,12 +1,12 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
@@ -14,18 +14,18 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -33,19 +33,14 @@
import org.springframework.stereotype.Component;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Vector;
/**
@@ -63,10 +58,13 @@
   private SIPCommanderFroPlatform cmderFroPlatform;
   @Autowired
   private IVideoManagerStorager storager;
   private IVideoManagerStorage storager;
   @Autowired
   private IRedisCatchStorage  redisCatchStorage;
   @Autowired
   private DynamicTask dynamicTask;
   @Autowired
   private SIPCommander cmder;
@@ -85,6 +83,15 @@
   @Autowired
   private SIPProcessorObserver sipProcessorObserver;
   @Autowired
   private VideoStreamSessionManager sessionManager;
   @Autowired
   private UserSetting userSetting;
   @Autowired
   private ZLMMediaListManager mediaListManager;
   @Override
@@ -105,7 +112,9 @@
      try {
         Request request = evt.getRequest();
         SipURI sipURI = (SipURI) request.getRequestURI();
         String channelId = sipURI.getUser();
         //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。
         //String channelId = sipURI.getUser();
         String channelId = SipUtils.getChannelIdFromHeader(request);
         String requesterId = SipUtils.getUserIdFromFromHeader(request);
         CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
         if (requesterId == null || channelId == null) {
@@ -138,12 +147,6 @@
               if (mediaServerItem == null) {
                  logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
                  responseAck(evt, Response.GONE);
                  return;
               }
               Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
               if (!streamReady ) {
                  logger.info("[ app={}, stream={} ]通道离线,返回400",gbStream.getApp(), gbStream.getStream());
                  responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
                  return;
               }
               responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
@@ -212,6 +215,9 @@
                        mediaTransmissionTCP = true;
                        if ("active".equals(setup)) {
                           tcpActive = true;
                           // 不支持tcp主动
                           responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 目录不支持点播
                           return;
                        } else if ("passive".equals(setup)) {
                           tcpActive = false;
                        }
@@ -228,6 +234,7 @@
            }
            String username = sdp.getOrigin().getUsername();
            String addressStr = sdp.getOrigin().getAddress();
            logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc);
            Device device  = null;
            // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
@@ -256,16 +263,17 @@
                  return;
               }
               sendRtpItem.setCallId(callIdHeader.getCallId());
               sendRtpItem.setPlay("Play".equals(sessionName));
               // 写入redis, 超时时回复
               redisCatchStorage.updateSendRTPSever(sendRtpItem);
               Device finalDevice = device;
               MediaServerItem finalMediaServerItem = mediaServerItem;
               sendRtpItem.setPlayType("Play".equals(sessionName)?InviteStreamType.PLAY:InviteStreamType.PLAYBACK);
               byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
               sendRtpItem.setDialog(dialogByteArray);
               byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
               sendRtpItem.setTransaction(transactionByteArray);
               Long finalStartTime = startTime;
               Long finalStopTime = stopTime;
               ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
                  logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId());
                  String app = responseJSON.getString("app");
                  String stream = responseJSON.getString("stream");
                  logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream);
                  //     * 0 等待设备推流上来
                  //     * 1 下级已经推流,等待上级平台回复ack
                  //     * 2 推流中
@@ -289,7 +297,15 @@
                  content.append("f=\r\n");
                  try {
                     // 超时未收到Ack应该回复bye,当前等待时间为10秒
                     dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
                        logger.info("Ack 等待超时");
                        mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc);
                        // 回复bye
                        cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
                     }, 60*1000);
                     responseSdpAck(evt, content.toString(), platform);
                  } catch (SipException e) {
                     e.printStackTrace();
                  } catch (InvalidArgumentException e) {
@@ -305,23 +321,106 @@
                     response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
                     ServerTransaction serverTransaction = getServerTransaction(evt);
                     serverTransaction.sendResponse(response);
                     if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
                     if (serverTransaction.getDialog() != null) {
                        serverTransaction.getDialog().delete();
                     }
                  } catch (ParseException | SipException | InvalidArgumentException e) {
                     e.printStackTrace();
                  }
               });
               sendRtpItem.setApp("rtp");
               if ("Playback".equals(sessionName)) {
                  sendRtpItem.setPlay(false);
                  sendRtpItem.setStreamId(ssrc);
                  sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
                  SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true);
                  sendRtpItem.setStreamId(ssrcInfo.getStream());
                  // 写入redis, 超时时回复
                  redisCatchStorage.updateSendRTPSever(sendRtpItem);
                  SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                  playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> {
                     if (result.getCode() != 0){
                        logger.warn("录像回放失败");
                        if (result.getEvent() != null) {
                           errorEvent.response(result.getEvent());
                  playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, format.format(start),
                        format.format(end), null, result -> {
                        if (result.getCode() != 0){
                           logger.warn("录像回放失败");
                           if (result.getEvent() != null) {
                              errorEvent.response(result.getEvent());
                           }
                           redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                           try {
                              responseAck(evt, Response.REQUEST_TIMEOUT);
                           } catch (SipException e) {
                              e.printStackTrace();
                           } catch (InvalidArgumentException e) {
                              e.printStackTrace();
                           } catch (ParseException e) {
                              e.printStackTrace();
                           }
                        }else {
                           if (result.getMediaServerItem() != null) {
                              hookEvent.response(result.getMediaServerItem(), result.getResponse());
                           }
                        }
                     });
               }else {
                  sendRtpItem.setPlayType(InviteStreamType.PLAY);
                  SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
                  if (playTransaction != null) {
                     Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
                     if (!streamReady) {
                        playTransaction = null;
                     }
                  }
                  if (playTransaction == null) {
                     String streamId = null;
                     if (mediaServerItem.isRtpEnable()) {
                        streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                     }
                     SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true);
                     sendRtpItem.setStreamId(ssrcInfo.getStream());
                     // 写入redis, 超时时回复
                     redisCatchStorage.updateSendRTPSever(sendRtpItem);
                     playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{
                        redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                     }, null);
                  }else {
                     sendRtpItem.setStreamId(playTransaction.getStream());
                     // 写入redis, 超时时回复
                     redisCatchStorage.updateSendRTPSever(sendRtpItem);
                     JSONObject jsonObject = new JSONObject();
                     jsonObject.put("app", sendRtpItem.getApp());
                     jsonObject.put("stream", sendRtpItem.getStreamId());
                     hookEvent.response(mediaServerItem, jsonObject);
                  }
               }
            }else if (gbStream != null) {
               Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
               if (!streamReady ) {
                  if ("proxy".equals(gbStream.getStreamType())) {
                     // TODO 控制启用以使设备上线
                     logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流",gbStream.getApp(), gbStream.getStream());
                     responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
                  }else if ("push".equals(gbStream.getStreamType())) {
                     if (!platform.isStartOfflinePush()) {
                        responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable");
                        return;
                     }
                     // 发送redis消息以使设备上线
                     logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",gbStream.getApp(), gbStream.getStream());
                     MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
                     messageForPushChannel.setType(1);
                     messageForPushChannel.setGbId(gbStream.getGbId());
                     messageForPushChannel.setApp(gbStream.getApp());
                     messageForPushChannel.setStream(gbStream.getStream());
                     // TODO 获取低负载的节点
                     messageForPushChannel.setMediaServerId(gbStream.getMediaServerId());
                     messageForPushChannel.setPlatFormId(platform.getServerGBId());
                     messageForPushChannel.setPlatFormName(platform.getName());
                     redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
                     // 设置超时
                     dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
                        logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
                        try {
                           responseAck(evt, Response.REQUEST_TIMEOUT);
                           mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId());
                           responseAck(evt, Response.REQUEST_TIMEOUT); // 超时
                        } catch (SipException e) {
                           e.printStackTrace();
                        } catch (InvalidArgumentException e) {
@@ -329,66 +428,72 @@
                        } catch (ParseException e) {
                           e.printStackTrace();
                        }
                     }else {
                        if (result.getMediaServerItem() != null) {
                           hookEvent.response(result.getMediaServerItem(), result.getResponse());
                     }, userSetting.getPlatformPlayTimeout());
                     // 添加监听
                     MediaServerItem finalMediaServerItem = mediaServerItem;
                     int finalPort = port;
                     boolean finalMediaTransmissionTCP = mediaTransmissionTCP;
                     Boolean finalTcpActive = tcpActive;
                     mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream)->{
                        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(finalMediaServerItem, addressStr, finalPort, ssrc, requesterId,
                              app, stream, channelId, finalMediaTransmissionTCP);
                        if (sendRtpItem == null) {
                           logger.warn("服务器端口资源不足");
                           try {
                              responseAck(evt, Response.BUSY_HERE);
                           } catch (SipException e) {
                              e.printStackTrace();
                           } catch (InvalidArgumentException e) {
                              e.printStackTrace();
                           } catch (ParseException e) {
                              e.printStackTrace();
                           }
                           return;
                        }
                     }
                  });
               }else {
                  sendRtpItem.setPlay(true);
                  StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
                  if (streamInfo == null) {
                     if (mediaServerItem.isRtpEnable()) {
                        sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));
                     }
                     sendRtpItem.setPlay(false);
                     playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent);
                  }else {
                     sendRtpItem.setStreamId(streamInfo.getStream());
                     hookEvent.response(mediaServerItem, null);
                        if (finalTcpActive != null) {
                           sendRtpItem.setTcpActive(finalTcpActive);
                        }
                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                        // 写入redis, 超时时回复
                        sendRtpItem.setStatus(1);
                        sendRtpItem.setCallId(callIdHeader.getCallId());
                        byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
                        sendRtpItem.setDialog(dialogByteArray);
                        byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                        sendRtpItem.setTransaction(transactionByteArray);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        sendStreamAck(finalMediaServerItem, sendRtpItem, platform, evt);
                     });
                  }
               }
            }else if (gbStream != null) {
               SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                     gbStream.getApp(), gbStream.getStream(), channelId,
                     mediaTransmissionTCP);
               }else {
                  SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
               if (tcpActive != null) {
                  sendRtpItem.setTcpActive(tcpActive);
               }
               if (sendRtpItem == null) {
                  logger.warn("服务器端口资源不足");
                  responseAck(evt, Response.BUSY_HERE);
                  return;
                  if (sendRtpItem == null) {
                     logger.warn("服务器端口资源不足");
                     responseAck(evt, Response.BUSY_HERE);
                     return;
                  }
                  if (tcpActive != null) {
                     sendRtpItem.setTcpActive(tcpActive);
                  }
                  sendRtpItem.setPlayType(InviteStreamType.PUSH);
                  // 写入redis, 超时时回复
                  sendRtpItem.setStatus(1);
                  sendRtpItem.setCallId(callIdHeader.getCallId());
                  byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
                  sendRtpItem.setDialog(dialogByteArray);
                  byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                  sendRtpItem.setTransaction(transactionByteArray);
                  redisCatchStorage.updateSendRTPSever(sendRtpItem);
                  sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
               }
               // 写入redis, 超时时回复
               redisCatchStorage.updateSendRTPSever(sendRtpItem);
               sendRtpItem.setStatus(1);
               redisCatchStorage.updateSendRTPSever(sendRtpItem);
               StringBuffer content = new StringBuffer(200);
               content.append("v=0\r\n");
               content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
               content.append("s=Play\r\n");
               content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
               content.append("t=0 0\r\n");
               content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
               content.append("a=sendonly\r\n");
               content.append("a=rtpmap:96 PS/90000\r\n");
               content.append("y="+ ssrc + "\r\n");
               content.append("f=\r\n");
               try {
                  responseSdpAck(evt, content.toString(), platform);
               } catch (SipException e) {
                  e.printStackTrace();
               } catch (InvalidArgumentException e) {
                  e.printStackTrace();
               } catch (ParseException e) {
                  e.printStackTrace();
               }
            }
         }
@@ -404,6 +509,39 @@
      }
   }
   public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt){
      StringBuffer content = new StringBuffer(200);
      content.append("v=0\r\n");
      content.append("o="+ sendRtpItem.getChannelId() +" 0 0 IN IP4 "+ mediaServerItem.getSdpIp()+"\r\n");
      content.append("s=Play\r\n");
      content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
      content.append("t=0 0\r\n");
      content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
      content.append("a=sendonly\r\n");
      content.append("a=rtpmap:96 PS/90000\r\n");
      if (sendRtpItem.isTcp()) {
         content.append("a=connection:new\r\n");
         if (!sendRtpItem.isTcpActive()) {
            content.append("a=setup:active\r\n");
         }else {
            content.append("a=setup:passive\r\n");
         }
      }
      content.append("y="+ sendRtpItem.getSsrc() + "\r\n");
      content.append("f=\r\n");
      try {
         responseSdpAck(evt, content.toString(), platform);
      } catch (SipException e) {
         e.printStackTrace();
      } catch (InvalidArgumentException e) {
         e.printStackTrace();
      } catch (ParseException e) {
         e.printStackTrace();
      }
   }
   public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
      // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)