648540858
2022-01-20 7d888274150e9415ae0fc16e67fefd1cdb4a69cc
处理服务重启或设备重新上线时的订阅,优化通道导入重复的处理
13个文件已修改
1个文件已添加
259 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/runner/SipDeviceRunner.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/PushVideoList.vue 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/importChannel.vue 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/importChannelShowErrorData.vue 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/jessibuca.vue 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -3,7 +3,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.Map;
@@ -40,4 +39,8 @@
        }
    }
    public boolean contains(String key) {
        return futureMap.get(key) != null;
    }
}
src/main/java/com/genersoft/iot/vmp/conf/runner/SipDeviceRunner.java
@@ -1,7 +1,8 @@
package com.genersoft.iot.vmp.conf.runner;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.beans.factory.annotation.Autowired;
@@ -28,6 +29,9 @@
    @Autowired
    private UserSetup userSetup;
    @Autowired
    private IDeviceService deviceService;
    @Override
    public void run(String... args) throws Exception {
        // 读取redis没有心跳信息的则设置为离线,等收到下次心跳设置为在线
@@ -36,9 +40,15 @@
        List<String> onlineForAll = redisCatchStorage.getOnlineForAll();
        for (String deviceId : onlineForAll) {
            storager.online(deviceId);
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device != null && device.getSubscribeCycleForCatalog() > 0) {
                // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
                deviceService.addCatalogSubscribe(device);
            }
        }
        // 重置cseq计数
        redisCatchStorage.resetAllCSEQ();
        // TODO 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
@@ -35,6 +35,9 @@
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private IDeviceService deviceService;
    
    @Autowired
    private RedisUtil redis;
@@ -57,6 +60,7 @@
            logger.debug("设备上线事件触发,deviceId:" + event.getDevice().getDeviceId() + ",from:" + event.getFrom());
        }
        Device device = event.getDevice();
        if (device == null) return;
        String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId();
        switch (event.getFrom()) {
@@ -84,15 +88,18 @@
        }
        device.setOnline(1);
        Device deviceInstore = storager.queryVideoDevice(device.getDeviceId());
        if (deviceInstore != null && deviceInstore.getOnline() == 0) {
        Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
        if (deviceInStore != null && deviceInStore.getOnline() == 0) {
            List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
            eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
        }
        // 处理上线监听
        storager.updateDevice(device);
        // TODO 上线添加订阅
        // 上线添加订阅
        if (device.getSubscribeCycleForCatalog() > 0) {
            deviceService.addCatalogSubscribe(device);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -18,6 +18,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.*;
@@ -76,12 +77,12 @@
            }else if (event.getGbStreams() != null) {
                if (platforms.size() > 0) {
                    for (GbStream gbStream : event.getGbStreams()) {
                        if (gbStream == null || StringUtils.isEmpty(gbStream.getGbId())) continue;
                        List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForStreamWithGBId(gbStream.getApp(),gbStream.getStream(), platforms);
                        parentPlatformMap.put(gbStream.getGbId(), parentPlatformsForGB);
                    }
                }
            }
        }
        switch (event.getType()) {
            case CatalogEvent.ON:
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
@@ -41,6 +41,8 @@
    public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP";
    public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL";
    public static final String CALLBACK_CMD_MOBILEPOSITION = "CALLBACK_MOBILEPOSITION";
    public static final String CALLBACK_CMD_PRESETQUERY = "CALLBACK_PRESETQUERY";
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -80,17 +80,14 @@
            Element rootElement = getRootElement(evt);
            String cmd = XmlUtil.getText(rootElement, "CmdType");
            if (CmdType.MOBILE_POSITION.equals(cmd)) {
                logger.info("接收到MobilePosition订阅");
                processNotifyMobilePosition(evt, rootElement);
//            } else if (CmdType.ALARM.equals(cmd)) {
//                logger.info("接收到Alarm订阅");
//                processNotifyAlarm(evt, rootElement);
            } else if (CmdType.CATALOG.equals(cmd)) {
                logger.info("接收到Catalog订阅");
                processNotifyCatalogList(evt, rootElement);
            } else {
                logger.info("接收到消息:" + cmd);
//                responseAck(evt, Response.OK);
                Response response = null;
                response = getMessageFactory().createResponse(200, request);
@@ -132,7 +129,7 @@
        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
        String sn = XmlUtil.getText(rootElement, "SN");
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_MobilePosition_" + platformId;
        logger.info("接收到{}的MobilePosition订阅", platformId);
        StringBuilder resultXml = new StringBuilder(200);
        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
                .append("<Response>\r\n")
@@ -182,7 +179,7 @@
        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
        String sn = XmlUtil.getText(rootElement, "SN");
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_Catalog_" + platformId;
        logger.info("接收到{}的Catalog订阅", platformId);
        StringBuilder resultXml = new StringBuilder(200);
        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
                .append("<Response>\r\n")
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -30,11 +30,15 @@
        if (device == null || device.getSubscribeCycleForCatalog() < 0) {
            return false;
        }
        if (dynamicTask.contains(device.getDeviceId())) {
            logger.info("[添加目录订阅] 设备{}的目录订阅以存在", device.getDeviceId());
            return false;
        }
        logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
        // 添加目录订阅
        CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
        catalogSubscribeTask.run();
        // 提前开始刷新订阅
        // TODO 使用jain sip的当时刷新订阅
        int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog();
        // 设置最小值为30
        subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30);
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java
@@ -14,15 +14,23 @@
public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> {
    private ErrorDataHandler errorDataHandler;
    private IStreamPushService pushService;
    private String defaultMediaServerId;
    private List<StreamPushItem> streamPushItems = new ArrayList<>();
    private Set<String> streamPushStreamSet = new HashSet<>();
    private Set<String> streamPushGBSet = new HashSet<>();
    private List<String> errorStreamList = new ArrayList<>();
    private List<String> errorGBList = new ArrayList<>();
    public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId) {
    public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) {
        this.pushService = pushService;
        this.defaultMediaServerId = defaultMediaServerId;
        this.errorDataHandler = errorDataHandler;
    }
    public interface ErrorDataHandler{
        void handle(List<String> streams, List<String> gbId);
    }
    @Override
@@ -32,9 +40,16 @@
                || StringUtils.isEmpty(streamPushExcelDto.getGbId())) {
            return;
        }
        if (streamPushGBSet.contains(streamPushExcelDto.getGbId())) {
            errorGBList.add(streamPushExcelDto.getGbId());
        }
        if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) {
            errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream());
        }
        if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) {
            return;
        }
        StreamPushItem streamPushItem = new StreamPushItem();
        streamPushItem.setApp(streamPushExcelDto.getApp());
        streamPushItem.setStream(streamPushExcelDto.getStream());
@@ -60,8 +75,11 @@
    @Override
    public void doAfterAllAnalysed(AnalysisContext analysisContext) {
        // 这里也要保存数据,确保最后遗留的数据也存储到数据库
        pushService.batchAdd(streamPushItems);
        if (streamPushItems.size() > 0) {
            pushService.batchAdd(streamPushItems);
        }
        streamPushGBSet.clear();
        streamPushStreamSet.clear();
        errorDataHandler.handle(errorStreamList, errorGBList);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -50,7 +50,7 @@
    StreamPushItem selectOne(String app, String stream);
    @Insert("<script>"  +
            "INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
            "REPLACE INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
            "createStamp, aliveSecond, mediaServerId) " +
            "VALUES <foreach collection='streamPushItems' item='item' index='index' separator=','>" +
            "( '${item.app}', '${item.stream}', '${item.totalReaderCount}', '${item.originType}', " +
src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java
@@ -4,11 +4,14 @@
import com.alibaba.excel.ExcelReader;
import com.alibaba.excel.read.metadata.ReadSheet;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.impl.StreamPushUploadFileHandler;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
@@ -18,12 +21,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Api(tags = "推流信息管理")
@Controller
@@ -38,6 +48,9 @@
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private DeferredResultHolder resultHolder;
    @ApiOperation("推流列表查询")
    @ApiImplicitParams({
@@ -103,10 +116,44 @@
    }
    @PostMapping(value = "upload")
    @ResponseBody
    public String uploadChannelFile(@RequestParam(value = "file") MultipartFile file){
    public DeferredResult<ResponseEntity<WVPResult<Object>>> uploadChannelFile(@RequestParam(value = "file") MultipartFile file){
        // 最多处理文件一个小时
        DeferredResult<ResponseEntity<WVPResult<Object>>> result = new DeferredResult<>(60*60*1000L);
        // 录像查询以channelId作为deviceId查询
        String key = DeferredResultHolder.UPLOAD_FILE_CHANNEL;
        String uuid = UUID.randomUUID().toString();
        if (file.isEmpty()) {
            return "fail";
            logger.warn("通道导入文件为空");
            WVPResult<Object> wvpResult = new WVPResult<>();
            wvpResult.setCode(-1);
            wvpResult.setMsg("文件为空");
            result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult));
            return result;
        }
        // 同时只处理一个文件
        if (resultHolder.exist(key, null)) {
            logger.warn("已有导入任务正在执行");
            WVPResult<Object> wvpResult = new WVPResult<>();
            wvpResult.setCode(-1);
            wvpResult.setMsg("已有导入任务正在执行");
            result.setResult(ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(wvpResult));
            return result;
        }
        resultHolder.put(key, uuid, result);
        result.onTimeout(()->{
            logger.warn("通道导入超时,可能文件过大");
            RequestMessage msg = new RequestMessage();
            msg.setKey(key);
            WVPResult<Object> wvpResult = new WVPResult<>();
            wvpResult.setCode(-1);
            wvpResult.setMsg("导入超时,可能文件过大");
            msg.setData(wvpResult);
            resultHolder.invokeAllResult(msg);
        });
        //获取文件流
        InputStream inputStream = null;
        try {
@@ -117,11 +164,29 @@
        }
        //传入参数
        ExcelReader excelReader = EasyExcel.read(inputStream, StreamPushExcelDto.class,
                new StreamPushUploadFileHandler(streamPushService, mediaServerService.getDefaultMediaServer().getId())).build();
                new StreamPushUploadFileHandler(streamPushService, mediaServerService.getDefaultMediaServer().getId(), (errorStreams, errorGBs)->{
                    logger.info("通道导入成功,存在重复App+Stream为{}个,存在国标ID为{}个", errorStreams.size(), errorGBs.size());
                    RequestMessage msg = new RequestMessage();
                    msg.setKey(key);
                    WVPResult<Map<String, List<String>>> wvpResult = new WVPResult<>();
                    if (errorStreams.size() == 0 && errorGBs.size() == 0) {
                        wvpResult.setCode(0);
                        wvpResult.setMsg("成功");
                    }else {
                        wvpResult.setCode(1);
                        wvpResult.setMsg("导入成功。但是存在重复数据");
                        Map<String, List<String>> errorData = new HashMap<>();
                        errorData.put("gbId", errorGBs);
                        errorData.put("stream", errorStreams);
                        wvpResult.setData(errorData);
                    }
                    msg.setData(wvpResult);
                    resultHolder.invokeAllResult(msg);
        })).build();
        ReadSheet readSheet = EasyExcel.readSheet(0).build();
        excelReader.read(readSheet);
        excelReader.finish();
        return "success";
        return result;
    }
web_src/src/components/PushVideoList.vue
@@ -127,8 +127,6 @@
                        count: that.count
                    }
                }).then(function (res) {
                    console.log(res);
                    console.log(res.data.list);
                    that.total = res.data.total;
                    that.pushList = res.data.list;
                    that.getDeviceListLoading = false;
web_src/src/components/dialog/importChannel.vue
@@ -16,6 +16,8 @@
          drag
          :action="uploadUrl"
          name="file"
          :on-success="successHook"
          :on-error="errorHook"
          >
          <i class="el-icon-upload"></i>
          <div class="el-upload__text">将文件拖到此处,或<em>点击上传</em></div>
@@ -23,14 +25,19 @@
        </el-upload>
      </div>
    </el-dialog>
    <ShowErrorData ref="showErrorData" :gbIds="errorGBIds" :streams="errorStreams" ></ShowErrorData>
  </div>
</template>
<script>
import ShowErrorData from './importChannelShowErrorData.vue'
export default {
  name: "importChannel",
  computed: {},
  components: {
    ShowErrorData,
  },
  created() {},
  data() {
    return {
@@ -38,6 +45,8 @@
      showDialog: false,
      isLoging: false,
      isEdit: false,
      errorStreams: null,
      errorGBIds: null,
      uploadUrl: process.env.NODE_ENV === 'development'?`debug/api/push/upload`:`api/push/upload`,
    };
  },
@@ -73,8 +82,35 @@
    },
    close: function () {
      this.showDialog = false;
      this.$refs.form.resetFields();
    },
    successHook: function(response, file, fileList){
      if (response.code === 0) {
        this.$message({
          showClose: true,
          message: response.msg,
          type: "success",
        });
      }else if (response.code === 1) {
        this.errorGBIds = response.data.gbId
        this.errorStreams = response.data.stream
        console.log(this.$refs)
        console.log(this.$refs.showErrorData)
        this.$refs.showErrorData.openDialog()
      }else {
        this.$message({
          showClose: true,
          message: response.msg,
          type: "error",
        });
      }
    },
    errorHook: function (err, file, fileList) {
      this.$message({
        showClose: true,
        message: err,
        type: "error",
      });
    }
  },
};
</script>
@@ -82,4 +118,8 @@
.upload-box{
  text-align: center;
}
.errDataBox{
  max-height: 15rem;
  overflow: auto;
}
</style>
web_src/src/components/dialog/importChannelShowErrorData.vue
New file
@@ -0,0 +1,64 @@
<template>
  <div id="importChannelShowErrorData" v-loading="isLoging">
    <el-dialog
      title="导入通道数据成功,但数据存在重复"
      width="30rem"
      top="2rem"
      :append-to-body="true"
      :close-on-click-modal="false"
      :visible.sync="showDialog"
      :destroy-on-close="true"
      @close="close()"
    >
      <div >
        重复国标ID:
        <el-button style="float: right;" type="primary" size="mini" icon="el-icon-document-copy"  title="点击拷贝" v-clipboard="gbIds.join(',')" @success="$message({type:'success', message:'成功拷贝到粘贴板'})">复制</el-button>
        <ul class="errDataBox">
          <li v-for="id in gbIds" >
            {{ id }}
          </li>
        </ul>
      </div>
      <div >
        重复App/stream:
        <el-button style="float: right;" type="primary" size="mini" icon="el-icon-document-copy"  title="点击拷贝" v-clipboard="streams.join(',')" @success="$message({type:'success', message:'成功拷贝到粘贴板'})">复制</el-button>
        <ul class="errDataBox">
          <li v-for="id in streams" >
            {{ id }}
          </li>
        </ul>
      </div>
    </el-dialog>
  </div>
</template>
<script>
export default {
  name: "importChannelShowErrorData",
  computed: {},
  created() {},
  props: ['gbIds', 'streams'],
  data() {
    return {
      isLoging: false,
      showDialog: false,
    };
  },
  methods: {
    openDialog: function () {
      this.showDialog = true;
    },
    close: function () {
      this.showDialog = false;
    },
  },
};
</script>
<style>
.errDataBox{
  max-height: 15rem;
  overflow: auto;
}
</style>
web_src/src/components/dialog/jessibuca.vue
@@ -78,7 +78,7 @@
          this.jessibuca = new window.Jessibuca(Object.assign(
            {
              container: this.$refs.container,
              videoBuffer: 0.5, // 最大缓冲时长,单位秒
              videoBuffer: 0.2, // 最大缓冲时长,单位秒
              isResize: true,
              isFlv: true,
              decoder: "./static/js/jessibuca/index.js",