648540858
2023-07-03 43ef195543c087d88ac3eea98067b81d7e2b10c2
Merge branch '2.6.8' into wvp-28181-2.0

# Conflicts:
# src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
# src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
5个文件已修改
1个文件已添加
375 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java 135 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java 161 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/catalogEdit.vue 59 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -157,6 +157,7 @@
    public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
    public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
    public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_";
    /**
     * Redis Const
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -385,7 +385,9 @@
                        }
                        GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                        if (gbStream != null) {
                            eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
                            if (userSetting.isUsePushingAsStatus()) {
                                eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
                            }
                        }
                        if (type != null) {
                            // 发送流变化redis消息
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -304,7 +304,7 @@
    @Override
    public void sendStreamChangeMsg(String type, JSONObject jsonObject) {
        String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type;
        logger.info("[redis 流变化事件] {}: {}", key, jsonObject.toString());
        logger.info("[redis 流变化事件] 发送 {}: {}", key, jsonObject.toString());
        redisTemplate.convertAndSend(key, jsonObject);
    }
@@ -540,14 +540,14 @@
    @Override
    public void sendMobilePositionMsg(JSONObject jsonObject) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
        logger.info("[redis发送通知] 移动位置 {}: {}", key, jsonObject.toString());
        logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
        redisTemplate.convertAndSend(key, jsonObject);
    }
    @Override
    public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
        logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream());
        logger.info("[redis发送通知] 发送 推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
@@ -555,7 +555,7 @@
    public void sendAlarmMsg(AlarmChannelMessage msg) {
        // 此消息用于对接第三方服务下级来的消息内容
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM;
        logger.info("[redis发送通知] 报警{}: {}", key, JSON.toJSON(msg));
        logger.info("[redis发送通知] 发送 报警{}: {}", key, JSON.toJSON(msg));
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
@@ -568,7 +568,7 @@
    @Override
    public void sendStreamPushRequestedMsgForStatus() {
        String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED;
        logger.info("[redis通知]获取所有推流设备的状态");
        logger.info("[redis通知] 发送 获取所有推流设备的状态");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put(key, key);
        redisTemplate.convertAndSend(key, jsonObject);
@@ -596,6 +596,7 @@
    @Override
    public void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS;
        logger.info("[redis通知] 发送 推送设备/通道状态, {}/{}-{}", deviceId, channelId, online);
        StringBuilder msg = new StringBuilder();
        msg.append(deviceId);
        if (channelId != null) {
@@ -626,14 +627,14 @@
    @Override
    public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
        logger.info("[redis发送通知] 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
        logger.info("[redis发送通知] 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java
New file
@@ -0,0 +1,135 @@
package com.genersoft.iot.vmp.vmanager.bean;
public class OtherRtpSendInfo {
    /**
     * 发流IP
     */
    private String ip;
    /**
     * 发流端口
     */
    private int port;
    /**
     * 收流IP
     */
    private String receiveIp;
    /**
     * 收流端口
     */
    private int receivePort;
    /**
     * 会话ID
     */
    private String callId;
    /**
     * 流ID
     */
    private String stream;
    /**
     * 推流应用名
     */
    private String pushApp;
    /**
     * 推流流ID
     */
    private String pushStream;
    /**
     * 推流SSRC
     */
    private String pushSSRC;
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getReceiveIp() {
        return receiveIp;
    }
    public void setReceiveIp(String receiveIp) {
        this.receiveIp = receiveIp;
    }
    public int getReceivePort() {
        return receivePort;
    }
    public void setReceivePort(int receivePort) {
        this.receivePort = receivePort;
    }
    public String getCallId() {
        return callId;
    }
    public void setCallId(String callId) {
        this.callId = callId;
    }
    public String getStream() {
        return stream;
    }
    public void setStream(String stream) {
        this.stream = stream;
    }
    public String getPushApp() {
        return pushApp;
    }
    public void setPushApp(String pushApp) {
        this.pushApp = pushApp;
    }
    public String getPushStream() {
        return pushStream;
    }
    public void setPushStream(String pushStream) {
        this.pushStream = pushStream;
    }
    public String getPushSSRC() {
        return pushSSRC;
    }
    public void setPushSSRC(String pushSSRC) {
        this.pushSSRC = pushSSRC;
    }
    @Override
    public String toString() {
        return "OtherRtpSendInfo{" +
                "ip='" + ip + '\'' +
                ", port=" + port +
                ", receiveIp='" + receiveIp + '\'' +
                ", receivePort=" + receivePort +
                ", callId='" + callId + '\'' +
                ", stream='" + stream + '\'' +
                '}';
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
@@ -1,25 +1,43 @@
package com.genersoft.iot.vmp.vmanager.rtp;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("rawtypes")
@Tag(name = "第三方服务对接")
@@ -31,8 +49,10 @@
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    private final static Logger logger = LoggerFactory.getLogger(RtpController.class);
    @Autowired
    private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -53,11 +73,11 @@
    private IDeviceChannelService channelService;
    @Autowired
    private IStreamPushService pushService;
    private DynamicTask dynamicTask;
    @Autowired
    private IStreamProxyService proxyService;
    private RedisTemplate<Object, Object> redisTemplate;
    @Value("${server.port}")
@@ -77,12 +97,76 @@
    @Parameter(name = "stream", description = "形成的流的ID", required = true)
    @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true)
    @Parameter(name = "callBack", description = "回调地址,如果收流超时会通道回调通知,回调为get请求,参数为callId", required = true)
    public SendRtpItem openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
    public OtherRtpSendInfo openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
        logger.info("[第三方服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
                isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        if (mediaServerItem == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
        }
        return null;
        if (stream == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"stream参数不可为空");
        }
        if (isSend != null && isSend && callId == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend为true时,CallID不能为空");
        }
        int ssrcInt = 0;
        if (ssrc != null) {
            try {
                ssrcInt = Integer.parseInt(ssrc);
            }catch (NumberFormatException e) {
                throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误");
            }
        }
        int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
        // 注册回调如果rtp收流超时则通过回调发送通知
        if (callBack != null) {
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (mediaServerItemInUse, response)->{
                        if (stream.equals(response.getString("stream_id"))) {
                            logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
                            OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
                            OkHttpClient client = httpClientBuilder.build();
                            String url = callBack + "?callId="  + callId;
                            Request request = new Request.Builder().get().url(url).build();
                            try {
                                client.newCall(request).execute();
                            } catch (IOException e) {
                                logger.error("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
                            }
                        }
                    });
        }
        OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo();
        otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
        otherRtpSendInfo.setReceivePort(localPort);
        otherRtpSendInfo.setCallId(callId);
        otherRtpSendInfo.setStream(stream);
        if (isSend != null && isSend) {
            String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
            // 预创建发流信息
            int port = zlmServerFactory.keepPort(mediaServerItem, callId, 0, ssrc1 -> {
                return redisTemplate.opsForValue().get(key) != null;
            });
            // 将信息写入redis中,以备后用
            redisTemplate.opsForValue().set(key, otherRtpSendInfo);
            // 设置超时任务,超时未使用,则自动移除,并关闭端口保持, 默认五分钟
            dynamicTask.startDelay(key, ()->{
                logger.info("[第三方服务对接->开启收流和获取发流信息] 端口保持超时 callId->{}", callId);
                redisTemplate.delete(key);
                zlmServerFactory.releasePort(mediaServerItem, callId);
            }, 300000);
            otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
            otherRtpSendInfo.setPort(port);
            logger.info("[开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo);
        }
        return otherRtpSendInfo;
    }
    @GetMapping(value = "/receive/close")
@@ -90,7 +174,9 @@
    @Operation(summary = "关闭收流")
    @Parameter(name = "stream", description = "流的ID", required = true)
    public void closeRtpServer(String stream) {
        logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        zlmServerFactory.closeRtpServer(mediaServerItem,stream);
    }
    @GetMapping(value = "/send/start")
@@ -103,9 +189,46 @@
    @Parameter(name = "stream", description = "待发送流Id", required = true)
    @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
    @Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
    @Parameter(name = "isUdp", description = "是否为UDP", required = true)
    @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false)
    public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Integer streamType) {
    public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, Integer streamType) {
        logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}",
                ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS");
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
        OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
        if (sendInfo != null) {
            zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId());
        }else {
            sendInfo = new OtherRtpSendInfo();
        }
        sendInfo.setPushApp(app);
        sendInfo.setPushStream(stream);
        sendInfo.setPushSSRC(ssrc);
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app",app);
        param.put("stream",stream);
        param.put("ssrc", ssrc);
        param.put("dst_url",ip);
        param.put("dst_port", port);
        String is_Udp = isUdp ? "1" : "0";
        param.put("is_udp", is_Udp);
        param.put("src_port", sendInfo.getPort());
        param.put("use_ps", streamType==2 ? "1" : "0");
        param.put("only_audio", onlyAudio ? "1" : "0");
        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
        if (jsonObject.getInteger("code") == 0) {
            logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
            redisTemplate.opsForValue().set(key, sendInfo);
        }else {
            redisTemplate.delete(key);
            logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
        }
    }
@@ -115,7 +238,25 @@
    @Operation(summary = "关闭发送流")
    @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
    public void closeSendRTP(String callId) {
        logger.info("[第三方服务对接->关闭发送流] callId->{}", callId);
        String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
        OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
        if (sendInfo == null){
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");
        }
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",sendInfo.getPushApp());
        param.put("stream",sendInfo.getPushStream());
        param.put("ssrc",sendInfo.getPushSSRC());
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param);
        if (!result) {
            logger.info("[第三方服务对接->关闭发送流] 失败 callId->{}", callId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "停止发流失败");
        }else {
            logger.info("[第三方服务对接->关闭发送流] 成功 callId->{}", callId);
        }
    }
}
web_src/src/components/dialog/catalogEdit.vue
@@ -12,15 +12,6 @@
    >
      <div id="shared" style="margin-top: 1rem;margin-right: 100px;">
        <el-form ref="form" :rules="rules" :model="form" label-width="140px" >
<!--          <el-form-item >-->
<!--            建议的类型:-->
<!--            <br/>-->
<!--            &emsp;&emsp;行政区划(可选2位/4位/6位/8位/10位数字,例如:130432,表示河北省邯郸市广平县)-->
<!--            <br/>-->
<!--            &emsp;&emsp;业务分组(第11、12、13位215,例如:34020000002150000001)-->
<!--            <br/>-->
<!--            &emsp;&emsp;虚拟组织(第11、12、13位216,例如:34020000002160000001)-->
<!--          </el-form-item>-->
          <el-form-item label="节点编号" prop="id" >
            <el-input v-model="form.id" :disabled="isEdit" clearable></el-input>
          </el-form-item>
@@ -63,7 +54,11 @@
          return callback(new Error('行政区划编号必须为2/4/6/8位'));
        }
        if (this.form.parentId !== this.platformDeviceId && this.form.parentId.length >= value.trim().length) {
          return callback(new Error('行政区划编号长度应该每次两位递增'));
          if (this.form.parentId.length === 20) {
            return callback(new Error('业务分组/虚拟组织下不可创建行政区划'));
          }else {
            return callback(new Error('行政区划编号长度应该每次两位递增'));
          }
        }
      }else {
        if (value.trim().length !== 20) {
@@ -122,27 +117,31 @@
      this.level = level;
    },
    onSubmit: function () {
      console.log("onSubmit");
      console.log(this.form);
      this.$axios({
        method:"post",
        url:`/api/platform/catalog/${!this.isEdit? "add":"edit"}`,
        data: this.form
      }).then((res)=> {
          if (res.data.code === 0) {
            if (this.submitCallback)this.submitCallback(this.form)
          }else {
            this.$message({
              showClose: true,
              message: res.data.msg,
              type: "error",
      this.$refs["form"].validate((valid) => {
        if (valid) {
          this.$axios({
            method:"post",
            url:`/api/platform/catalog/${!this.isEdit? "add":"edit"}`,
            data: this.form
          }).then((res)=> {
            if (res.data.code === 0) {
              if (this.submitCallback)this.submitCallback(this.form)
            }else {
              this.$message({
                showClose: true,
                message: res.data.msg,
                type: "error",
              });
            }
            this.close();
          })
            .catch((error)=> {
              console.log(error);
            });
          }
          this.close();
        })
        .catch((error)=> {
          console.log(error);
        });
        } else {
          return false;
        }
      });
    },
    close: function () {
      this.isEdit = false;