648540858
2022-09-24 d7a1b94f905c5f28c9c8f2d48c3f9e28ebcf9cc4
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -19,8 +19,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
@@ -105,7 +103,7 @@
   @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
   public JSONObject onServerKeepalive(@RequestBody JSONObject json){
      logger.info("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString());
      logger.info("[ ZLM HOOK ]on_server_keepalive API调用,参数:" + json.toString());
      String mediaServerId = json.getString("mediaServerId");
      List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
      if (subscribes != null  && subscribes.size() > 0) {
@@ -419,10 +417,11 @@
      String schema = item.getSchema();
      List<MediaItem.MediaTrack> tracks = item.getTracks();
      boolean regist = item.isRegist();
      if (item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
            || item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
            || item.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
         if (regist) {
      if (regist) {
         if (item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
               || item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
               || item.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
            StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
            if (streamAuthorityInfo == null) {
               streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item);
@@ -431,9 +430,9 @@
               streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr());
            }
            redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
         }else {
            redisCatchStorage.removeStreamAuthorityInfo(app, stream);
         }
      }else {
         redisCatchStorage.removeStreamAuthorityInfo(app, stream);
      }
      if ("rtsp".equals(schema)){
@@ -453,15 +452,12 @@
            if (streamInfo!=null){
               redisCatchStorage.stopPlay(streamInfo);
               storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
               // 如果正在给上级推送,则发送bye
            }else{
               streamInfo = redisCatchStorage.queryPlayback(null, null, stream, null);
               if (streamInfo != null) {
                  redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(),
                        streamInfo.getStream(), null);
               }
               // 如果正在给上级推送,则发送bye
            }
         }else {
            if (!"rtp".equals(app)){
@@ -511,6 +507,19 @@
               }
            }
         }
         if (!regist) {
            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(stream);
            if (sendRtpItems.size() > 0) {
               for (SendRtpItem sendRtpItem : sendRtpItems) {
                  if (sendRtpItem.getApp().equals(app)) {
                     String platformId = sendRtpItem.getPlatformId();
                     ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
                     commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
                  }
               }
            }
         }
      }
      JSONObject ret = new JSONObject();
@@ -544,6 +553,8 @@
                  for (SendRtpItem sendRtpItem : sendRtpItems) {
                     ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                     commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
                     redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                           sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                  }
               }
            }
@@ -573,13 +584,19 @@
         return ret;
      }else {
         StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
         if (streamProxyItem != null && streamProxyItem.isEnable_remove_none_reader()) {
            ret.put("close", true);
            streamProxyService.del(app, streamId);
            String url = streamProxyItem.getUrl() != null?streamProxyItem.getUrl():streamProxyItem.getSrc_url();
            logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除",  app, streamId, url);
         }else {
            ret.put("close", false);
         if (streamProxyItem != null ) {
            if (streamProxyItem.isEnable_remove_none_reader()) {
               // 无人观看自动移除
               ret.put("close", true);
               streamProxyService.del(app, streamId);
               String url = streamProxyItem.getUrl() != null?streamProxyItem.getUrl():streamProxyItem.getSrc_url();
               logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除",  app, streamId, url);
            }else if (streamProxyItem.isEnable_disable_none_reader()) {
               // 无人观看停用
               ret.put("close", true);
            }else {
               ret.put("close", false);
            }
         }
         return ret;
      }
@@ -626,7 +643,7 @@
   @ResponseBody
   @PostMapping(value = "/on_server_started", produces = "application/json;charset=UTF-8")
   public JSONObject onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){
      if (logger.isDebugEnabled()) {
         logger.debug("[ ZLM HOOK ]on_server_started API调用,参数:" + jsonObject.toString());
      }
@@ -649,6 +666,39 @@
      return ret;
   }
   /**
    * 发送rtp(startSendRtp)被动关闭时回调
    */
   @ResponseBody
   @PostMapping(value = "/on_send_rtp_stopped", produces = "application/json;charset=UTF-8")
   public JSONObject onSendRtpStopped(HttpServletRequest request, @RequestBody JSONObject jsonObject){
      logger.info("[ ZLM HOOK ]on_send_rtp_stopped API调用,参数:" + jsonObject);
      JSONObject ret = new JSONObject();
      ret.put("code", 0);
      ret.put("msg", "success");
      // 查找对应的上级推流,发送停止
      String app = jsonObject.getString("app");
      if (!"rtp".equals(app)) {
         return ret;
      }
      String stream = jsonObject.getString("stream");
      List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(stream);
      if (sendRtpItems.size() > 0) {
         for (SendRtpItem sendRtpItem : sendRtpItems) {
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
            redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                  sendRtpItem.getCallId(), sendRtpItem.getStreamId());
         }
      }
      return ret;
   }
   private Map<String, String> urlParamToMap(String params) {
      HashMap<String, String> map = new HashMap<>();
      if (ObjectUtils.isEmpty(params)) {