panlinlin
2021-04-27 a3649ca243f827f78b85dbb41af2c3d7d978aa89
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java
@@ -12,18 +12,15 @@
import javax.sip.message.Response;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.service.IPlayService;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.slf4j.Logger;
@@ -89,16 +86,23 @@
         }
         // 查询请求方是否上级平台
         ParentPlatform platform = storager.queryParentPlatById(requesterId);
         ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
         if (platform != null) {
            // 查询平台下是否有该通道
            DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
            if (channel == null) {
            GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
            // 不是通道可能是直播流
            if (channel != null || gbStream != null ) {
               if (channel.getStatus() == 0) {
                  logger.info("通道离线,返回400");
                  responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline");
                  return;
               }
               responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
            }else {
               logger.info("通道不存在,返回404");
               responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在
               return;
            }else {
               responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
            }
            // 解析sdp消息, 使用jainsip 自带的sdp解析方式
            String contentString = new String(request.getRawContent());
@@ -153,67 +157,120 @@
            String addressStr = sdp.getOrigin().getAddress();
            //String sessionName = sdp.getSessionName().getValue();
            logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc);
            Device device  = null;
            // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
            if (channel != null) {
               device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
               if (device == null) {
                  logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
                  responseAck(evt, Response.SERVER_INTERNAL_ERROR);
                  return;
               }
               SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId,
                     device.getDeviceId(), channelId,
                     mediaTransmissionTCP);
               if (tcpActive != null) {
                  sendRtpItem.setTcpActive(tcpActive);
               }
               if (sendRtpItem == null) {
                  logger.warn("服务器端口资源不足");
                  responseAck(evt, Response.BUSY_HERE);
                  return;
               }
            Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
            if (device == null) {
               logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
               responseAck(evt, Response.SERVER_INTERNAL_ERROR);
               return;
            }
            SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId,
                  mediaTransmissionTCP);
            if (tcpActive != null) {
               sendRtpItem.setTcpActive(tcpActive);
            }
            if (sendRtpItem == null) {
               logger.warn("服务器端口资源不足");
               responseAck(evt, Response.BUSY_HERE);
               return;
            }
            // 写入redis, 超时时回复
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
            // 通知下级推流,
            PlayResult playResult = playService.play(device.getDeviceId(), channelId, (responseJSON)->{
               // 收到推流, 回复200OK, 等待ack
               sendRtpItem.setStatus(1);
               // 写入redis, 超时时回复
               redisCatchStorage.updateSendRTPSever(sendRtpItem);
               // TODO 添加对tcp的支持
               MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
               StringBuffer content = new StringBuffer(200);
               content.append("v=0\r\n");
               content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
               content.append("s=Play\r\n");
               content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
               content.append("t=0 0\r\n");
               content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
               content.append("a=sendonly\r\n");
               content.append("a=rtpmap:96 PS/90000\r\n");
               content.append("y="+ ssrc + "\r\n");
               content.append("f=\r\n");
               // 通知下级推流,
               PlayResult playResult = playService.play(device.getDeviceId(), channelId, (responseJSON)->{
                  // 收到推流, 回复200OK, 等待ack
                  // if (sendRtpItem == null) return;
                  sendRtpItem.setStatus(1);
                  redisCatchStorage.updateSendRTPSever(sendRtpItem);
                  // TODO 添加对tcp的支持
                  MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
                  StringBuffer content = new StringBuffer(200);
                  content.append("v=0\r\n");
                  content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
                  content.append("s=Play\r\n");
                  content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
                  content.append("t=0 0\r\n");
                  content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
                  content.append("a=sendonly\r\n");
                  content.append("a=rtpmap:96 PS/90000\r\n");
                  content.append("y="+ ssrc + "\r\n");
                  content.append("f=\r\n");
               try {
                  responseAck(evt, content.toString());
               } catch (SipException e) {
                  e.printStackTrace();
               } catch (InvalidArgumentException e) {
                  e.printStackTrace();
               } catch (ParseException e) {
                  e.printStackTrace();
                  try {
                     responseAck(evt, content.toString());
                  } catch (SipException e) {
                     e.printStackTrace();
                  } catch (InvalidArgumentException e) {
                     e.printStackTrace();
                  } catch (ParseException e) {
                     e.printStackTrace();
                  }
               } ,(event -> {
                  // 未知错误。直接转发设备点播的错误
                  Response response = null;
                  try {
                     response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
                     getServerTransaction(evt).sendResponse(response);
                  } catch (ParseException | SipException | InvalidArgumentException e) {
                     e.printStackTrace();
                  }
               }));
               if (logger.isDebugEnabled()) {
                  logger.debug(playResult.getResult().toString());
               }
            },(event -> {
               // 未知错误。直接转发设备点播的错误
               Response response = null;
               try {
                  response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
                  getServerTransaction(evt).sendResponse(response);
               } catch (ParseException | SipException | InvalidArgumentException e) {
                  e.printStackTrace();
            }else if (gbStream != null) {
               SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId,
                     gbStream.getApp(), gbStream.getStream(), channelId,
                     mediaTransmissionTCP);
               if (tcpActive != null) {
                  sendRtpItem.setTcpActive(tcpActive);
               }
            }));
            if (logger.isDebugEnabled()) {
               logger.debug(playResult.getResult().toString());
               if (sendRtpItem == null) {
                  logger.warn("服务器端口资源不足");
                  responseAck(evt, Response.BUSY_HERE);
                  return;
               }
               // 写入redis, 超时时回复
               redisCatchStorage.updateSendRTPSever(sendRtpItem);
               // 检测直播流是否在线
               Boolean streamReady = zlmrtpServerFactory.isStreamReady(gbStream.getApp(), gbStream.getStream());
               if (streamReady) {
                  sendRtpItem.setStatus(1);
                  redisCatchStorage.updateSendRTPSever(sendRtpItem);
                  // TODO 添加对tcp的支持
                  MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
                  StringBuffer content = new StringBuffer(200);
                  content.append("v=0\r\n");
                  content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
                  content.append("s=Play\r\n");
                  content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
                  content.append("t=0 0\r\n");
                  content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
                  content.append("a=sendonly\r\n");
                  content.append("a=rtpmap:96 PS/90000\r\n");
                  content.append("y="+ ssrc + "\r\n");
                  content.append("f=\r\n");
                  try {
                     responseAck(evt, content.toString());
                  } catch (SipException e) {
                     e.printStackTrace();
                  } catch (InvalidArgumentException e) {
                     e.printStackTrace();
                  } catch (ParseException e) {
                     e.printStackTrace();
                  }
               }
            }
         } else {
            // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
            Device device = storager.queryVideoDevice(requesterId);
@@ -298,6 +355,7 @@
      }
   }
   /***
    * 回复状态码
    * 100 trying
@@ -314,6 +372,12 @@
      getServerTransaction(evt).sendResponse(response);
   }
   private void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException {
      Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
      response.setReasonPhrase(msg);
      getServerTransaction(evt).sendResponse(response);
   }
   /**
    * 回复带sdp的200
    * @param evt