src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -103,12 +103,12 @@ public void stop(String key) { if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(true); Runnable runnable = runnableMap.get(key); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); } // Runnable runnable = runnableMap.get(key); // if (runnable instanceof ISubscribeTask) { // ISubscribeTask subscribeTask = (ISubscribeTask) runnable; // subscribeTask.stop(); // } futureMap.get(key).cancel(false); } } src/main/java/com/genersoft/iot/vmp/gb28181/bean/HandlerCatchData.java
New file @@ -0,0 +1,44 @@ package com.genersoft.iot.vmp.gb28181.bean; import org.dom4j.Element; import javax.sip.RequestEvent; /** * @author lin */ public class HandlerCatchData { private RequestEvent evt; private Device device; private Element rootElement; public HandlerCatchData(RequestEvent evt, Device device, Element rootElement) { this.evt = evt; this.device = device; this.rootElement = rootElement; } public RequestEvent getEvt() { return evt; } public void setEvt(RequestEvent evt) { this.evt = evt; } public Device getDevice() { return device; } public void setDevice(Device device) { this.device = device; } public Element getRootElement() { return rootElement; } public void setRootElement(Element rootElement) { this.rootElement = rootElement; } } src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -38,7 +39,6 @@ catalogMap.put(platformId, subscribeInfo); // 添加订阅到期 String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; dynamicTask.stop(taskOverdueKey); // 添加任务处理订阅过期 dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), subscribeInfo.getExpires() * 1000); @@ -49,10 +49,17 @@ } public void removeCatalogSubscribe(String platformId) { catalogMap.remove(platformId); String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); } public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) { @@ -63,7 +70,6 @@ storager, platformId, subscribeInfo.getSn(), key, this, dynamicTask), subscribeInfo.getGpsInterval() * 1000); String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; dynamicTask.stop(taskOverdueKey); // 添加任务处理订阅过期 dynamicTask.startDelay(taskOverdueKey, () -> { removeMobilePositionSubscribe(subscribeInfo.getId()); @@ -81,6 +87,11 @@ // 结束任务处理GPS定时推送 dynamicTask.stop(key); String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); } src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -66,7 +66,7 @@ subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribe == null) { logger.info("发送订阅消息时发现订阅信息已经不存在"); logger.info("发送订阅消息时发现订阅信息已经不存在: {}", event.getPlatformId()); return; } }else { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -150,30 +150,24 @@ public void processTimeout(TimeoutEvent timeoutEvent) { logger.info("[消息发送超时]"); ClientTransaction clientTransaction = timeoutEvent.getClientTransaction(); eventPublisher.requestTimeOut(timeoutEvent); if (clientTransaction != null) { logger.info("[发送错误订阅] clientTransaction != null"); Request request = clientTransaction.getRequest(); if (request != null) { logger.info("[发送错误订阅] request != null"); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); if (callIdHeader != null) { logger.info("[发送错误订阅]"); SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent); subscribe.response(eventResult); sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); } } } // Timeout timeout = timeoutEvent.getTimeout(); // ServerTransaction serverTransaction = timeoutEvent.getServerTransaction(); // if (serverTransaction != null) { // Request request = serverTransaction.getRequest(); // URI requestURI = request.getRequestURI(); // Header header = request.getHeader(FromHeader.NAME); // } // if(timeoutProcessor != null) { // timeoutProcessor.process(timeoutEvent); // } eventPublisher.requestTimeOut(timeoutEvent); } @Override src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1487,7 +1487,6 @@ Request request; if (dialog != null) { logger.info("发送移动位置订阅消息时 dialog的状态为: {}", dialog.getState()); request = dialog.createRequest(Request.SUBSCRIBE); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); request.setContent(subscribePostitionXml.toString(), contentTypeHeader); @@ -1583,12 +1582,12 @@ Request request; if (dialog != null) { logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState()); request = dialog.createRequest(Request.SUBSCRIBE); ExpiresHeader expiresHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForCatalog()); request.setExpires(expiresHeader); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); request.setContent(cmdXml.toString(), contentTypeHeader); ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition()); request.addHeader(expireHeader); }else { String tm = Long.toString(System.currentTimeMillis()); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -26,6 +25,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -36,6 +37,7 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP命令类型: NOTIFY请求 @@ -64,10 +66,18 @@ @Autowired private EventPublisher publisher; private String method = "NOTIFY"; private final String method = "NOTIFY"; @Autowired private SIPProcessorObserver sipProcessorObserver; private boolean taskQueueHandlerRun = false; private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { @@ -78,23 +88,40 @@ @Override public void process(RequestEvent evt) { try { Element rootElement = getRootElement(evt); taskQueue.offer(new HandlerCatchData(evt, null, null)); responseAck(evt, Response.OK); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(()-> { while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); Element rootElement = getRootElement(take.getEvt()); String cmd = XmlUtil.getText(rootElement, "CmdType"); if (CmdType.CATALOG.equals(cmd)) { logger.info("接收到Catalog通知"); processNotifyCatalogList(evt); processNotifyCatalogList(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("接收到Alarm通知"); processNotifyAlarm(evt); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { logger.info("接收到MobilePosition通知"); processNotifyMobilePosition(evt); processNotifyMobilePosition(take.getEvt()); } else { logger.info("接收到消息:" + cmd); responseAck(evt, Response.OK); } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { } catch (DocumentException e) { throw new RuntimeException(e); } } taskQueueHandlerRun = false; }); } } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } @@ -167,8 +194,7 @@ jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); responseAck(evt, Response.OK); } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { } catch (DocumentException e) { e.printStackTrace(); } } @@ -189,7 +215,7 @@ Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { responseAck(evt, Response.NOT_FOUND, "device is not found"); logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); return; } rootElement = getRootElement(evt, device.getCharset()); @@ -199,7 +225,7 @@ deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); if (alarmTime == null) { responseAck(evt, Response.BAD_REQUEST, "AlarmTime cannot be null"); logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); return; } deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); @@ -219,7 +245,7 @@ deviceAlarm.setLatitude(0.00); } logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); if (deviceAlarm.getAlarmMethod().equals("4")) { if ("4".equals(deviceAlarm.getAlarmMethod())) { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); @@ -240,11 +266,10 @@ // TODO: 需要实现存储报警信息、报警分类 // 回复200 OK responseAck(evt, Response.OK); if (redisCatchStorage.deviceIsOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { } catch (DocumentException e) { e.printStackTrace(); } } @@ -280,64 +305,60 @@ continue; } Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { logger.warn("[收到 目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); event = CatalogEvent.ADD; }else { event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHander(itemDevice); channel.setDeviceId(device.getDeviceId()); logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); switch (eventElement.getText().toUpperCase()) { switch (event) { case CatalogEvent.ON: // 上线 logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOnline(deviceId, channel.getChannelId()); // 回复200 OK responseAck(evt, Response.OK); break; case CatalogEvent.OFF : // 离线 logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); // 回复200 OK responseAck(evt, Response.OK); break; case CatalogEvent.VLOST: // 视频丢失 logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); // 回复200 OK responseAck(evt, Response.OK); break; case CatalogEvent.DEFECT: // 故障 // 回复200 OK responseAck(evt, Response.OK); break; case CatalogEvent.ADD: // 增加 logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.updateChannel(deviceId, channel); responseAck(evt, Response.OK); break; case CatalogEvent.DEL: // 删除 logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.delChannel(deviceId, channel.getChannelId()); responseAck(evt, Response.OK); break; case CatalogEvent.UPDATE: // 更新 logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.updateChannel(deviceId, channel); responseAck(evt, Response.OK); break; default: responseAck(evt, Response.BAD_REQUEST, "event not found"); logger.warn("[ NotifyCatalog ] event not found : {}", event ); } // 转发变化信息 eventPublisher.catalogEventPublish(null, channel, eventElement.getText().toUpperCase()); eventPublisher.catalogEventPublish(null, channel, event); } } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { } catch (DocumentException e) { e.printStackTrace(); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -20,6 +20,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -31,6 +33,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @Component public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { @@ -38,8 +41,12 @@ private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); private final String cmdType = "Catalog"; private boolean taskQueueHandlerRun = false; @Autowired private ResponseMessageHandler responseMessageHandler; private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private IVideoManagerStorage storager; @@ -63,6 +70,10 @@ @Autowired private IRedisCatchStorage redisCatchStorage; @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -70,23 +81,39 @@ @Override public void handForDevice(RequestEvent evt, Device device, Element element) { String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getDeviceId(); taskQueue.offer(new HandlerCatchData(evt, device, element)); // 回复200 OK try { responseAck(evt, Response.OK); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(()-> { while (!taskQueue.isEmpty()) { HandlerCatchData take = taskQueue.poll(); String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + take.getDevice().getDeviceId(); Element rootElement = null; try { rootElement = getRootElement(evt, device.getCharset()); rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); Element deviceListElement = rootElement.element("DeviceList"); Element sumNumElement = rootElement.element("SumNum"); Element snElement = rootElement.element("SN"); if (snElement == null || sumNumElement == null || deviceListElement == null) { responseAck(evt, Response.BAD_REQUEST, "xml error"); responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error"); return; } int sumNum = Integer.parseInt(sumNumElement.getText()); if (sumNum == 0) { // 数据已经完整接收 storager.cleanChannelsForDevice(device.getDeviceId()); catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); }else { Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { @@ -103,26 +130,25 @@ // processNotifyMobilePosition(evt, itemDevice); // } DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice); deviceChannel.setDeviceId(device.getDeviceId()); deviceChannel.setDeviceId(take.getDevice().getDeviceId()); channelList.add(deviceChannel); } int sn = Integer.parseInt(snElement.getText()); catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList); logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(device.getDeviceId()) == null ? 0 :catalogDataCatch.get(device.getDeviceId()).size(), sumNum); if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) { catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); logger.info("收到来自设备【{}】的通道: {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { // 数据已经完整接收 boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId())); boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); if (!resetChannelsResult) { String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条"; catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg); String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); }else { catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); } } } // 回复200 OK responseAck(evt, Response.OK); } } catch (DocumentException e) { e.printStackTrace(); @@ -134,6 +160,11 @@ e.printStackTrace(); } } taskQueueHandlerRun = false; }); } } @Override public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element rootElement) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -1,9 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; @@ -19,6 +16,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -28,6 +27,9 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -42,6 +44,9 @@ private final String cmdType = "RecordInfo"; private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); private boolean taskQueueHandlerRun = false; @Autowired private ResponseMessageHandler responseMessageHandler; @@ -51,10 +56,12 @@ @Autowired private DeferredResultHolder deferredResultHolder; @Autowired private EventPublisher eventPublisher; @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { @@ -67,25 +74,33 @@ // 回复200 OK try { responseAck(evt, Response.OK); rootElement = getRootElement(evt, device.getCharset()); String sn = getText(rootElement, "SN"); taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(()->{ try { while (!taskQueue.isEmpty()) { HandlerCatchData take = taskQueue.poll(); Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset()); String sn = getText(rootElementForCharset, "SN"); String channelId = getText(rootElementForCharset, "DeviceID"); RecordInfo recordInfo = new RecordInfo(); recordInfo.setDeviceId(device.getDeviceId()); recordInfo.setChannelId(channelId); recordInfo.setDeviceId(take.getDevice().getDeviceId()); recordInfo.setSn(sn); recordInfo.setName(getText(rootElement, "Name")); String sumNumStr = getText(rootElement, "SumNum"); recordInfo.setName(getText(rootElementForCharset, "Name")); String sumNumStr = getText(rootElementForCharset, "SumNum"); int sumNum = 0; if (!StringUtils.isEmpty(sumNumStr)) { sumNum = Integer.parseInt(sumNumStr); } recordInfo.setSumNum(sumNum); Element recordListElement = rootElement.element("RecordList"); Element recordListElement = rootElementForCharset.element("RecordList"); if (recordListElement == null || sumNum == 0) { logger.info("无录像数据"); eventPublisher.recordEndEventPush(recordInfo); recordDataCatch.put(device.getDeviceId(), sn, sumNum, new ArrayList<>()); releaseRequest(device.getDeviceId(), sn); recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>()); releaseRequest(take.getDevice().getDeviceId(), sn); } else { Iterator<Element> recordListIterator = recordListElement.elementIterator(); if (recordListIterator != null) { @@ -120,21 +135,27 @@ recordInfo.setRecordList(recordList); // 发送消息,如果是上级查询此录像,则会通过这里通知给上级 eventPublisher.recordEndEventPush(recordInfo); int count = recordDataCatch.put(device.getDeviceId(), sn, sumNum, recordList); logger.info("[国标录像], {}->{}: {}/{}", device.getDeviceId(), sn, count, sumNum); int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList); logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum); } if (recordDataCatch.isComplete(device.getDeviceId(), sn)){ releaseRequest(device.getDeviceId(), sn); if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){ releaseRequest(take.getDevice().getDeviceId(), sn); } } } taskQueueHandlerRun = false; }catch (DocumentException e) { throw new RuntimeException(e); } }); } } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } catch (DocumentException e) { e.printStackTrace(); } } src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.service.IDeviceService; @@ -95,7 +96,6 @@ } // 刷新过期任务 String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId(); dynamicTask.stop(registerExpireTaskKey); dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId()), device.getExpires() * 1000); } @@ -144,8 +144,16 @@ if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } logger.info("移除目录订阅: {}", device.getDeviceId()); dynamicTask.stop(device.getDeviceId() + "catalog"); logger.info("[移除目录订阅]: {}", device.getDeviceId()); String taskKey = device.getDeviceId() + "catalog"; if (device.getOnline() == 1) { Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); } } dynamicTask.stop(taskKey); return true; } @@ -169,8 +177,16 @@ if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } logger.info("移除移动位置订阅: {}", device.getDeviceId()); dynamicTask.stop(device.getDeviceId() + "mobile_position"); logger.info("[移除移动位置订阅]: {}", device.getDeviceId()); String taskKey = device.getDeviceId() + "mobile_position"; if (device.getOnline() == 1) { Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); } } dynamicTask.stop(taskKey); return true; } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -206,6 +206,11 @@ Set<String> allKeys = dynamicTask.getAllKeys(); for (String key : allKeys) { if (key.startsWith(deviceId)) { Runnable runnable = dynamicTask.get(key); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); } dynamicTask.stop(key); } }