处理服务重启或设备重新上线时的订阅,优化通道导入重复的处理
| | |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; |
| | | import org.springframework.scheduling.support.CronTrigger; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.Map; |
| | |
| | | } |
| | | } |
| | | |
| | | public boolean contains(String key) { |
| | | return futureMap.get(key) != null; |
| | | } |
| | | |
| | | } |
| | |
| | | package com.genersoft.iot.vmp.conf.runner; |
| | | |
| | | import com.genersoft.iot.vmp.common.VideoManagerConstants; |
| | | import com.genersoft.iot.vmp.conf.UserSetup; |
| | | import com.genersoft.iot.vmp.gb28181.bean.Device; |
| | | import com.genersoft.iot.vmp.service.IDeviceService; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorager; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | |
| | | @Autowired |
| | | private UserSetup userSetup; |
| | | |
| | | @Autowired |
| | | private IDeviceService deviceService; |
| | | |
| | | @Override |
| | | public void run(String... args) throws Exception { |
| | | // 读取redis没有心跳信息的则设置为离线,等收到下次心跳设置为在线 |
| | |
| | | List<String> onlineForAll = redisCatchStorage.getOnlineForAll(); |
| | | for (String deviceId : onlineForAll) { |
| | | storager.online(deviceId); |
| | | Device device = redisCatchStorage.getDevice(deviceId); |
| | | if (device != null && device.getSubscribeCycleForCatalog() > 0) { |
| | | // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅 |
| | | deviceService.addCatalogSubscribe(device); |
| | | } |
| | | } |
| | | // 重置cseq计数 |
| | | redisCatchStorage.resetAllCSEQ(); |
| | | // TODO 查询在线设备那些开启了订阅,为设备开启定时的目录订阅 |
| | | |
| | | |
| | | } |
| | | } |
| | |
| | |
|
| | | @Autowired
|
| | | private IVideoManagerStorager storager;
|
| | |
|
| | | @Autowired
|
| | | private IDeviceService deviceService;
|
| | |
|
| | | @Autowired
|
| | | private RedisUtil redis;
|
| | |
| | | logger.debug("设备上线事件触发,deviceId:" + event.getDevice().getDeviceId() + ",from:" + event.getFrom());
|
| | | }
|
| | | Device device = event.getDevice();
|
| | | if (device == null) return;
|
| | | String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId();
|
| | |
|
| | | switch (event.getFrom()) {
|
| | |
| | | }
|
| | |
|
| | | device.setOnline(1);
|
| | | Device deviceInstore = storager.queryVideoDevice(device.getDeviceId());
|
| | | if (deviceInstore != null && deviceInstore.getOnline() == 0) {
|
| | | Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
|
| | | if (deviceInStore != null && deviceInStore.getOnline() == 0) {
|
| | | List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
|
| | | eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
|
| | | }
|
| | | // 处理上线监听
|
| | | storager.updateDevice(device);
|
| | |
|
| | | // TODO 上线添加订阅
|
| | | // 上线添加订阅
|
| | | if (device.getSubscribeCycleForCatalog() > 0) {
|
| | | deviceService.addCatalogSubscribe(device);
|
| | | }
|
| | |
|
| | | }
|
| | | }
|
| | |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.ApplicationListener; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.util.*; |
| | | |
| | |
| | | }else if (event.getGbStreams() != null) { |
| | | if (platforms.size() > 0) { |
| | | for (GbStream gbStream : event.getGbStreams()) { |
| | | if (gbStream == null || StringUtils.isEmpty(gbStream.getGbId())) continue; |
| | | List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForStreamWithGBId(gbStream.getApp(),gbStream.getStream(), platforms); |
| | | parentPlatformMap.put(gbStream.getGbId(), parentPlatformsForGB); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | switch (event.getType()) { |
| | | case CatalogEvent.ON: |
| | |
| | |
|
| | | public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP";
|
| | |
|
| | | public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL";
|
| | |
|
| | | public static final String CALLBACK_CMD_MOBILEPOSITION = "CALLBACK_MOBILEPOSITION";
|
| | |
|
| | | public static final String CALLBACK_CMD_PRESETQUERY = "CALLBACK_PRESETQUERY";
|
| | |
| | | Element rootElement = getRootElement(evt); |
| | | String cmd = XmlUtil.getText(rootElement, "CmdType"); |
| | | if (CmdType.MOBILE_POSITION.equals(cmd)) { |
| | | logger.info("接收到MobilePosition订阅"); |
| | | processNotifyMobilePosition(evt, rootElement); |
| | | // } else if (CmdType.ALARM.equals(cmd)) { |
| | | // logger.info("接收到Alarm订阅"); |
| | | // processNotifyAlarm(evt, rootElement); |
| | | } else if (CmdType.CATALOG.equals(cmd)) { |
| | | logger.info("接收到Catalog订阅"); |
| | | processNotifyCatalogList(evt, rootElement); |
| | | } else { |
| | | logger.info("接收到消息:" + cmd); |
| | | // responseAck(evt, Response.OK); |
| | | |
| | | Response response = null; |
| | | response = getMessageFactory().createResponse(200, request); |
| | |
| | | SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId); |
| | | String sn = XmlUtil.getText(rootElement, "SN"); |
| | | String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + platformId; |
| | | |
| | | logger.info("接收到{}的MobilePosition订阅", platformId); |
| | | StringBuilder resultXml = new StringBuilder(200); |
| | | resultXml.append("<?xml version=\"1.0\" ?>\r\n") |
| | | .append("<Response>\r\n") |
| | |
| | | SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId); |
| | | String sn = XmlUtil.getText(rootElement, "SN"); |
| | | String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platformId; |
| | | |
| | | logger.info("接收到{}的Catalog订阅", platformId); |
| | | StringBuilder resultXml = new StringBuilder(200); |
| | | resultXml.append("<?xml version=\"1.0\" ?>\r\n") |
| | | .append("<Response>\r\n") |
| | |
| | | if (device == null || device.getSubscribeCycleForCatalog() < 0) { |
| | | return false; |
| | | } |
| | | if (dynamicTask.contains(device.getDeviceId())) { |
| | | logger.info("[添加目录订阅] 设备{}的目录订阅以存在", device.getDeviceId()); |
| | | return false; |
| | | } |
| | | logger.info("[添加目录订阅] 设备{}", device.getDeviceId()); |
| | | // 添加目录订阅 |
| | | CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander); |
| | | catalogSubscribeTask.run(); |
| | | // 提前开始刷新订阅 |
| | | // TODO 使用jain sip的当时刷新订阅 |
| | | int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog(); |
| | | // 设置最小值为30 |
| | | subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30); |
| | |
| | | |
| | | public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> { |
| | | |
| | | private ErrorDataHandler errorDataHandler; |
| | | private IStreamPushService pushService; |
| | | private String defaultMediaServerId; |
| | | private List<StreamPushItem> streamPushItems = new ArrayList<>(); |
| | | private Set<String> streamPushStreamSet = new HashSet<>(); |
| | | private Set<String> streamPushGBSet = new HashSet<>(); |
| | | private List<String> errorStreamList = new ArrayList<>(); |
| | | private List<String> errorGBList = new ArrayList<>(); |
| | | |
| | | public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId) { |
| | | public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) { |
| | | this.pushService = pushService; |
| | | this.defaultMediaServerId = defaultMediaServerId; |
| | | this.errorDataHandler = errorDataHandler; |
| | | } |
| | | |
| | | public interface ErrorDataHandler{ |
| | | void handle(List<String> streams, List<String> gbId); |
| | | } |
| | | |
| | | @Override |
| | |
| | | || StringUtils.isEmpty(streamPushExcelDto.getGbId())) { |
| | | return; |
| | | } |
| | | if (streamPushGBSet.contains(streamPushExcelDto.getGbId())) { |
| | | errorGBList.add(streamPushExcelDto.getGbId()); |
| | | } |
| | | if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) { |
| | | errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()); |
| | | } |
| | | if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) { |
| | | return; |
| | | } |
| | | |
| | | StreamPushItem streamPushItem = new StreamPushItem(); |
| | | streamPushItem.setApp(streamPushExcelDto.getApp()); |
| | | streamPushItem.setStream(streamPushExcelDto.getStream()); |
| | |
| | | @Override |
| | | public void doAfterAllAnalysed(AnalysisContext analysisContext) { |
| | | // 这里也要保存数据,确保最后遗留的数据也存储到数据库 |
| | | pushService.batchAdd(streamPushItems); |
| | | if (streamPushItems.size() > 0) { |
| | | pushService.batchAdd(streamPushItems); |
| | | } |
| | | streamPushGBSet.clear(); |
| | | streamPushStreamSet.clear(); |
| | | errorDataHandler.handle(errorStreamList, errorGBList); |
| | | } |
| | | } |
| | |
| | | StreamPushItem selectOne(String app, String stream); |
| | | |
| | | @Insert("<script>" + |
| | | "INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " + |
| | | "REPLACE INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " + |
| | | "createStamp, aliveSecond, mediaServerId) " + |
| | | "VALUES <foreach collection='streamPushItems' item='item' index='index' separator=','>" + |
| | | "( '${item.app}', '${item.stream}', '${item.totalReaderCount}', '${item.originType}', " + |
| | |
| | | import com.alibaba.excel.ExcelReader; |
| | | import com.alibaba.excel.read.metadata.ReadSheet; |
| | | import com.genersoft.iot.vmp.gb28181.bean.GbStream; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IStreamPushService; |
| | | import com.genersoft.iot.vmp.service.impl.StreamPushUploadFileHandler; |
| | | import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; |
| | | import com.genersoft.iot.vmp.vmanager.bean.WVPResult; |
| | | import com.github.pagehelper.PageInfo; |
| | | import io.swagger.annotations.Api; |
| | | import io.swagger.annotations.ApiImplicitParam; |
| | |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.http.HttpStatus; |
| | | import org.springframework.http.ResponseEntity; |
| | | import org.springframework.stereotype.Controller; |
| | | import org.springframework.web.bind.annotation.*; |
| | | import org.springframework.web.context.request.async.DeferredResult; |
| | | import org.springframework.web.multipart.MultipartFile; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.UUID; |
| | | |
| | | @Api(tags = "推流信息管理") |
| | | @Controller |
| | |
| | | |
| | | @Autowired |
| | | private IMediaServerService mediaServerService; |
| | | |
| | | @Autowired |
| | | private DeferredResultHolder resultHolder; |
| | | |
| | | @ApiOperation("推流列表查询") |
| | | @ApiImplicitParams({ |
| | |
| | | } |
| | | @PostMapping(value = "upload") |
| | | @ResponseBody |
| | | public String uploadChannelFile(@RequestParam(value = "file") MultipartFile file){ |
| | | public DeferredResult<ResponseEntity<WVPResult<Object>>> uploadChannelFile(@RequestParam(value = "file") MultipartFile file){ |
| | | |
| | | |
| | | // 最多处理文件一个小时 |
| | | DeferredResult<ResponseEntity<WVPResult<Object>>> result = new DeferredResult<>(60*60*1000L); |
| | | // 录像查询以channelId作为deviceId查询 |
| | | String key = DeferredResultHolder.UPLOAD_FILE_CHANNEL; |
| | | String uuid = UUID.randomUUID().toString(); |
| | | |
| | | if (file.isEmpty()) { |
| | | return "fail"; |
| | | logger.warn("通道导入文件为空"); |
| | | WVPResult<Object> wvpResult = new WVPResult<>(); |
| | | wvpResult.setCode(-1); |
| | | wvpResult.setMsg("文件为空"); |
| | | result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult)); |
| | | return result; |
| | | } |
| | | // 同时只处理一个文件 |
| | | if (resultHolder.exist(key, null)) { |
| | | logger.warn("已有导入任务正在执行"); |
| | | WVPResult<Object> wvpResult = new WVPResult<>(); |
| | | wvpResult.setCode(-1); |
| | | wvpResult.setMsg("已有导入任务正在执行"); |
| | | result.setResult(ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(wvpResult)); |
| | | return result; |
| | | } |
| | | |
| | | resultHolder.put(key, uuid, result); |
| | | result.onTimeout(()->{ |
| | | logger.warn("通道导入超时,可能文件过大"); |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setKey(key); |
| | | WVPResult<Object> wvpResult = new WVPResult<>(); |
| | | wvpResult.setCode(-1); |
| | | wvpResult.setMsg("导入超时,可能文件过大"); |
| | | msg.setData(wvpResult); |
| | | resultHolder.invokeAllResult(msg); |
| | | }); |
| | | //获取文件流 |
| | | InputStream inputStream = null; |
| | | try { |
| | |
| | | } |
| | | //传入参数 |
| | | ExcelReader excelReader = EasyExcel.read(inputStream, StreamPushExcelDto.class, |
| | | new StreamPushUploadFileHandler(streamPushService, mediaServerService.getDefaultMediaServer().getId())).build(); |
| | | new StreamPushUploadFileHandler(streamPushService, mediaServerService.getDefaultMediaServer().getId(), (errorStreams, errorGBs)->{ |
| | | logger.info("通道导入成功,存在重复App+Stream为{}个,存在国标ID为{}个", errorStreams.size(), errorGBs.size()); |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setKey(key); |
| | | WVPResult<Map<String, List<String>>> wvpResult = new WVPResult<>(); |
| | | if (errorStreams.size() == 0 && errorGBs.size() == 0) { |
| | | wvpResult.setCode(0); |
| | | wvpResult.setMsg("成功"); |
| | | }else { |
| | | wvpResult.setCode(1); |
| | | wvpResult.setMsg("导入成功。但是存在重复数据"); |
| | | Map<String, List<String>> errorData = new HashMap<>(); |
| | | errorData.put("gbId", errorGBs); |
| | | errorData.put("stream", errorStreams); |
| | | wvpResult.setData(errorData); |
| | | } |
| | | msg.setData(wvpResult); |
| | | resultHolder.invokeAllResult(msg); |
| | | })).build(); |
| | | ReadSheet readSheet = EasyExcel.readSheet(0).build(); |
| | | excelReader.read(readSheet); |
| | | excelReader.finish(); |
| | | return "success"; |
| | | return result; |
| | | } |
| | | |
| | | |
| | |
| | | count: that.count |
| | | } |
| | | }).then(function (res) { |
| | | console.log(res); |
| | | console.log(res.data.list); |
| | | that.total = res.data.total; |
| | | that.pushList = res.data.list; |
| | | that.getDeviceListLoading = false; |
| | |
| | | drag |
| | | :action="uploadUrl" |
| | | name="file" |
| | | :on-success="successHook" |
| | | :on-error="errorHook" |
| | | > |
| | | <i class="el-icon-upload"></i> |
| | | <div class="el-upload__text">将文件拖到此处,或<em>点击上传</em></div> |
| | |
| | | </el-upload> |
| | | </div> |
| | | </el-dialog> |
| | | <ShowErrorData ref="showErrorData" :gbIds="errorGBIds" :streams="errorStreams" ></ShowErrorData> |
| | | </div> |
| | | </template> |
| | | |
| | | <script> |
| | | |
| | | import ShowErrorData from './importChannelShowErrorData.vue' |
| | | |
| | | export default { |
| | | name: "importChannel", |
| | | computed: {}, |
| | | components: { |
| | | ShowErrorData, |
| | | }, |
| | | created() {}, |
| | | data() { |
| | | return { |
| | |
| | | showDialog: false, |
| | | isLoging: false, |
| | | isEdit: false, |
| | | errorStreams: null, |
| | | errorGBIds: null, |
| | | uploadUrl: process.env.NODE_ENV === 'development'?`debug/api/push/upload`:`api/push/upload`, |
| | | }; |
| | | }, |
| | |
| | | }, |
| | | close: function () { |
| | | this.showDialog = false; |
| | | this.$refs.form.resetFields(); |
| | | }, |
| | | successHook: function(response, file, fileList){ |
| | | if (response.code === 0) { |
| | | this.$message({ |
| | | showClose: true, |
| | | message: response.msg, |
| | | type: "success", |
| | | }); |
| | | }else if (response.code === 1) { |
| | | this.errorGBIds = response.data.gbId |
| | | this.errorStreams = response.data.stream |
| | | console.log(this.$refs) |
| | | console.log(this.$refs.showErrorData) |
| | | this.$refs.showErrorData.openDialog() |
| | | }else { |
| | | this.$message({ |
| | | showClose: true, |
| | | message: response.msg, |
| | | type: "error", |
| | | }); |
| | | } |
| | | }, |
| | | errorHook: function (err, file, fileList) { |
| | | this.$message({ |
| | | showClose: true, |
| | | message: err, |
| | | type: "error", |
| | | }); |
| | | } |
| | | }, |
| | | }; |
| | | </script> |
| | |
| | | .upload-box{ |
| | | text-align: center; |
| | | } |
| | | .errDataBox{ |
| | | max-height: 15rem; |
| | | overflow: auto; |
| | | } |
| | | </style> |
New file |
| | |
| | | <template> |
| | | <div id="importChannelShowErrorData" v-loading="isLoging"> |
| | | <el-dialog |
| | | title="导入通道数据成功,但数据存在重复" |
| | | width="30rem" |
| | | top="2rem" |
| | | :append-to-body="true" |
| | | :close-on-click-modal="false" |
| | | :visible.sync="showDialog" |
| | | :destroy-on-close="true" |
| | | @close="close()" |
| | | > |
| | | <div > |
| | | 重复国标ID: |
| | | <el-button style="float: right;" type="primary" size="mini" icon="el-icon-document-copy" title="点击拷贝" v-clipboard="gbIds.join(',')" @success="$message({type:'success', message:'成功拷贝到粘贴板'})">复制</el-button> |
| | | <ul class="errDataBox"> |
| | | <li v-for="id in gbIds" > |
| | | {{ id }} |
| | | </li> |
| | | </ul> |
| | | </div> |
| | | |
| | | <div > |
| | | 重复App/stream: |
| | | <el-button style="float: right;" type="primary" size="mini" icon="el-icon-document-copy" title="点击拷贝" v-clipboard="streams.join(',')" @success="$message({type:'success', message:'成功拷贝到粘贴板'})">复制</el-button> |
| | | <ul class="errDataBox"> |
| | | <li v-for="id in streams" > |
| | | {{ id }} |
| | | </li> |
| | | </ul> |
| | | </div> |
| | | </el-dialog> |
| | | </div> |
| | | </template> |
| | | |
| | | <script> |
| | | |
| | | export default { |
| | | name: "importChannelShowErrorData", |
| | | computed: {}, |
| | | created() {}, |
| | | props: ['gbIds', 'streams'], |
| | | data() { |
| | | return { |
| | | isLoging: false, |
| | | showDialog: false, |
| | | }; |
| | | }, |
| | | methods: { |
| | | openDialog: function () { |
| | | this.showDialog = true; |
| | | }, |
| | | close: function () { |
| | | this.showDialog = false; |
| | | }, |
| | | }, |
| | | }; |
| | | </script> |
| | | <style> |
| | | .errDataBox{ |
| | | max-height: 15rem; |
| | | overflow: auto; |
| | | } |
| | | </style> |
| | |
| | | this.jessibuca = new window.Jessibuca(Object.assign( |
| | | { |
| | | container: this.$refs.container, |
| | | videoBuffer: 0.5, // 最大缓冲时长,单位秒 |
| | | videoBuffer: 0.2, // 最大缓冲时长,单位秒 |
| | | isResize: true, |
| | | isFlv: true, |
| | | decoder: "./static/js/jessibuca/index.js", |