648540858
2022-06-27 26739237e2d93460eb869067a6004bfa63a1bdb8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -26,11 +26,14 @@
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -68,13 +71,16 @@
   private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class);
   private String method = "INVITE";
    private final String method = "INVITE";
   @Autowired
   private SIPCommanderFroPlatform cmderFroPlatform;
   @Autowired
   private IVideoManagerStorage storager;
    @Autowired
    private IStreamPushService streamPushService;
   @Autowired
   private IRedisCatchStorage  redisCatchStorage;
@@ -89,10 +95,16 @@
   private IPlayService playService;
   @Autowired
    private ISIPCommander commander;
   @Autowired
   private AudioBroadcastManager audioBroadcastManager;
   @Autowired
   private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
   @Autowired
   private ZLMRESTfulUtils zlmresTfulUtils;
@@ -123,6 +135,10 @@
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
   @Override
   public void afterPropertiesSet() throws Exception {
      // 添加消息处理的订阅
@@ -132,15 +148,14 @@
   /**
    * 处理invite请求
    * 
    * @param evt
    *            请求消息
     * @param evt 请求消息
    */ 
   @Override
   public void process(RequestEvent evt) {
      //  Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
      try {
         Request request = evt.getRequest();
         SipURI sipURI = (SipURI) request.getRequestURI();
            SipURI sipUri = (SipURI) request.getRequestURI();
         //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。
         //String channelId = sipURI.getUser();
         String channelId = SipUtils.getChannelIdFromHeader(request);
@@ -148,20 +163,23 @@
         CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
         if (requesterId == null || channelId == null) {
            logger.info("无法从FromHeader的Address中获取到平台id,返回400");
            responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误
                // 参数不全, 发400,请求错误
                responseAck(evt, Response.BAD_REQUEST);
            return;
         }
         // 查询请求是否来自上级平台\设备
         ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
         if (platform == null) {
            inviteFromDeviceHandle(evt, requesterId, channelId);
                inviteFromDeviceHandle(evt, requesterId);
         }else {
            // 查询平台下是否有该通道
            DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
            GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
            PlatformCatalog catalog = storager.getCatalog(channelId);
            MediaServerItem mediaServerItem = null;
                StreamPushItem streamPushItem = null;
            // 不是通道可能是直播流
            if (channel != null && gbStream == null ) {
               if (channel.getStatus() == 0) {
@@ -171,12 +189,31 @@
               }
               responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
            }else if(channel == null && gbStream != null){
               String mediaServerId = gbStream.getMediaServerId();
               mediaServerItem = mediaServerService.getOne(mediaServerId);
               if (mediaServerItem == null) {
                        if ("proxy".equals(gbStream.getStreamType())) {
                  logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
                  responseAck(evt, Response.GONE);
                  return;
                        } else {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(evt, Response.GONE);
                                return;
                            }
                        }
                    } else {
                        if ("push".equals(gbStream.getStreamType())) {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(evt, Response.GONE);
                                return;
                            }
                        }
               }
               responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
            }else if (catalog != null) {
@@ -217,8 +254,8 @@
               startTime = startTimeFiled.getStartTime();
               stopTime = startTimeFiled.getStopTime();
               start = Instant.ofEpochMilli(startTime*1000);
               end = Instant.ofEpochMilli(stopTime*1000);
                    start = Instant.ofEpochSecond(startTime);
                    end = Instant.ofEpochSecond(stopTime);
            }
            //  获取支持的格式
            Vector mediaDescriptions = sdp.getMediaDescriptions(true);
@@ -264,7 +301,7 @@
            String username = sdp.getOrigin().getUsername();
            String addressStr = sdp.getOrigin().getAddress();
            logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc);
                logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc);
            Device device  = null;
            // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
            if (channel != null) {
@@ -293,10 +330,7 @@
               }
               sendRtpItem.setCallId(callIdHeader.getCallId());
               sendRtpItem.setPlayType("Play".equals(sessionName)?InviteStreamType.PLAY:InviteStreamType.PLAYBACK);
               byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
               sendRtpItem.setDialog(dialogByteArray);
               byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
               sendRtpItem.setTransaction(transactionByteArray);
               Long finalStartTime = startTime;
               Long finalStopTime = stopTime;
               ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
@@ -401,11 +435,12 @@
                     if (mediaServerItem.isRtpEnable()) {
                        streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                     }
                     SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true, false);
                            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
                     sendRtpItem.setStreamId(ssrcInfo.getStream());
                     // 写入redis, 超时时回复
                     redisCatchStorage.updateSendRTPSever(sendRtpItem);
                     playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{
                                logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
                        redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                     }, null);
                  }else {
@@ -419,9 +454,87 @@
                  }
               }
            }else if (gbStream != null) {
                    if (streamPushItem.isStatus()) {
                        // 在线状态
                        pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                    } else {
                        // 不在线 拉起
                        notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                    }
                }
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
            logger.warn("sdp解析错误");
            e.printStackTrace();
        } catch (SdpParseException e) {
            e.printStackTrace();
        } catch (SdpException e) {
            e.printStackTrace();
        }
    }
    /**
     * 安排推流
     */
    private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
        // 推流
        if (streamPushItem.getServerId().equals(userSetting.getServerId())) {
               Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
               if (!streamReady ) {
            if (streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
                    responseAck(evt, Response.BUSY_HERE);
                    return;
                }
                if (tcpActive != null) {
                    sendRtpItem.setTcpActive(tcpActive);
                }
                sendRtpItem.setPlayType(InviteStreamType.PUSH);
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setCallId(callIdHeader.getCallId());
                byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
                sendRtpItem.setDialog(dialogByteArray);
                byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                sendRtpItem.setTransaction(transactionByteArray);
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
            } else {
                // 不在线 拉起
                notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            }
        } else {
            // 其他平台内容
            otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
        }
    }
    /**
     * 通知流上线
     */
    private void notifyStreamOnline(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
                  if ("proxy".equals(gbStream.getStreamType())) {
                     // TODO 控制启用以使设备上线
                     logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流",gbStream.getApp(), gbStream.getStream());
@@ -433,15 +546,10 @@
                     }
                     // 发送redis消息以使设备上线
                     logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",gbStream.getApp(), gbStream.getStream());
                     MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
                     messageForPushChannel.setType(1);
                     messageForPushChannel.setGbId(gbStream.getGbId());
                     messageForPushChannel.setApp(gbStream.getApp());
                     messageForPushChannel.setStream(gbStream.getStream());
                     // TODO 获取低负载的节点
                     messageForPushChannel.setMediaServerId(gbStream.getMediaServerId());
                     messageForPushChannel.setPlatFormId(platform.getServerGBId());
                     messageForPushChannel.setPlatFormName(platform.getName());
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                    gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
                    platform.getName(), null, gbStream.getMediaServerId());
                     redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
                     // 设置超时
                     dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
@@ -458,13 +566,15 @@
                        }
                     }, userSetting.getPlatformPlayTimeout());
                     // 添加监听
                     MediaServerItem finalMediaServerItem = mediaServerItem;
                     int finalPort = port;
                     boolean finalMediaTransmissionTCP = mediaTransmissionTCP;
                     Boolean finalTcpActive = tcpActive;
                     mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream)->{
                        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(finalMediaServerItem, addressStr, finalPort, ssrc, requesterId,
                              app, stream, channelId, finalMediaTransmissionTCP);
            // 添加在本机上线的通知
            mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP);
                        if (sendRtpItem == null) {
                           logger.warn("服务器端口资源不足");
@@ -491,20 +601,43 @@
                        byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                        sendRtpItem.setTransaction(transactionByteArray);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        sendStreamAck(finalMediaServerItem, sendRtpItem, platform, evt);
                    sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
                } else {
                    // 其他平台内容
                    otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                            mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                }
                     });
                  }
               }else {
                  SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
    }
                  if (sendRtpItem == null) {
    /**
     * 来自其他wvp的推流
     */
    private void otherWvpPushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, null, responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                     logger.warn("服务器端口资源不足");
                        try {
                     responseAck(evt, Response.BUSY_HERE);
                        } catch (SipException e) {
                            e.printStackTrace();
                        } catch (InvalidArgumentException e) {
                            e.printStackTrace();
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                     return;
                  }
                    // 收到sendItem
                  if (tcpActive != null) {
                     sendRtpItem.setTcpActive(tcpActive);
                  }
@@ -517,23 +650,45 @@
                  byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                  sendRtpItem.setTransaction(transactionByteArray);
                  redisCatchStorage.updateSendRTPSever(sendRtpItem);
                  sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
                    sendStreamAck(responseSendItemMsg.getMediaServerItem(), sendRtpItem, platform, evt);
                }, (wvpResult) -> {
                    try {
                        // 错误
                        if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
                            // 离线
                            // 查询是否在本机上线了
                            StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
                            if (currentStreamPushItem.isStatus()) {
                                // 在线状态
                                pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            } else {
                                // 不在线 拉起
                                notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }
                        }
                    } catch (InvalidArgumentException e) {
                        throw new RuntimeException(e);
                    } catch (ParseException e) {
                        throw new RuntimeException(e);
                    } catch (SipException e) {
                        throw new RuntimeException(e);
               }
            }
         }
      } catch (SipException | InvalidArgumentException | ParseException e) {
                    try {
                        responseAck(evt, Response.BUSY_HERE);
                    } catch (SipException e) {
         e.printStackTrace();
         logger.warn("sdp解析错误");
                    } catch (InvalidArgumentException e) {
         e.printStackTrace();
      } catch (SdpParseException e) {
         e.printStackTrace();
      } catch (SdpException e) {
                    } catch (ParseException e) {
         e.printStackTrace();
      }
                    return;
                });
   }
   public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt){
@@ -569,16 +724,10 @@
      }
   }
   public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId1) throws InvalidArgumentException, ParseException, SipException, SdpException {
    public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
      // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
      Device device = redisCatchStorage.getDevice(requesterId);
      AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId1);
      if (audioBroadcastCatch == null) {
         logger.warn("来自设备的Invite请求非语音广播,已忽略");
         responseAck(evt, Response.FORBIDDEN);
         return;
      }
      Request request = evt.getRequest();
      if (device != null) {
         logger.info("收到设备" + requesterId + "的语音广播Invite请求");
@@ -591,7 +740,7 @@
         int ssrcIndex = contentString.indexOf("y=");
         if (ssrcIndex > 0) {
            substring = contentString.substring(0, ssrcIndex);
            ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
                ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
         }
         ssrcIndex = substring.indexOf("f=");
         if (ssrcIndex > 0) {
@@ -601,9 +750,9 @@
         //  获取支持的格式
         Vector mediaDescriptions = sdp.getMediaDescriptions(true);
         // 查看是否支持PS 负载96
         int port = -1;
            //boolean recvonly = false;
         boolean mediaTransmissionTCP = false;
         Boolean tcpActive = null;
         for (int i = 0; i < mediaDescriptions.size(); i++) {
@@ -635,30 +784,15 @@
            responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
            return;
         }
            String username = sdp.getOrigin().getUsername();
         String addressStr = sdp.getOrigin().getAddress();
         logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc);
            logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
         MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device);
         if (mediaServerItem == null) {
            logger.warn("未找到可用的zlm");
            responseAck(evt, Response.BUSY_HERE);
            return;
        } else {
            logger.warn("来自无效设备/平台的请求");
            responseAck(evt, Response.BAD_REQUEST);
         }
         SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
               device.getDeviceId(), audioBroadcastCatch.getChannelId(),
               mediaTransmissionTCP);
         if (sendRtpItem == null) {
            logger.warn("服务器端口资源不足");
            responseAck(evt, Response.BUSY_HERE);
            return;
         }
         sendRtpItem.setTcp(mediaTransmissionTCP);
         if (tcpActive != null) {
            sendRtpItem.setTcpActive(tcpActive);
         }
         String app = "broadcast";
         String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId();
         CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
         sendRtpItem.setPlayType(InviteStreamType.PLAY);
         sendRtpItem.setCallId(callIdHeader.getCallId());