src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -20,10 +18,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,8 +37,6 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>(); private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); @@ -60,274 +56,110 @@ private IDeviceChannelService deviceChannelService; @Autowired private DynamicTask dynamicTask; @Autowired private CivilCodeFileConf civilCodeFileConf; @Autowired private SipConfig sipConfig; private final static String talkKey = "notify-request-for-catalog-task"; @Transactional public void process(List<RequestEvent> evtList) { if (evtList.isEmpty()) { return; } for (RequestEvent evt : evtList) { try { long start = System.currentTimeMillis(); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); public void process(RequestEvent evt) { try { long start = System.currentTimeMillis(); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); Device device = redisCatchStorage.getDevice(deviceId); if (device == null || !device.isOnLine()) { logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); return; } Element rootElement = getRootElement(evt, device.getCharset()); if (rootElement == null) { logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); return; } Element deviceListElement = rootElement.element("DeviceList"); if (deviceListElement == null) { return; } Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { Device device = redisCatchStorage.getDevice(deviceId); if (device == null || !device.isOnLine()) { logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); return; } Element rootElement = getRootElement(evt, device.getCharset()); if (rootElement == null) { logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); return; } Element deviceListElement = rootElement.element("DeviceList"); if (deviceListElement == null) { return; } Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { // 遍历DeviceList while (deviceListIterator.hasNext()) { Element itemDevice = deviceListIterator.next(); Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); event = CatalogEvent.ADD; }else { event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); if (channel == null) { logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); continue; } if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { channel.setParentId(null); } channel.setDeviceId(device.getDeviceId()); logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); switch (event) { case CatalogEvent.ON: // 上线 logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); updateChannelOnlineList.add(channel); if (updateChannelOnlineList.size() > 300) { executeSaveForOnline(); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); } break; case CatalogEvent.OFF : // 离线 logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } } break; case CatalogEvent.VLOST: // 视频丢失 logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } } break; case CatalogEvent.DEFECT: // 故障 logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } } break; case CatalogEvent.ADD: // 增加 logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); // 判断此通道是否存在 DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); if (deviceChannel != null) { logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); channel.setId(deviceChannel.getId()); updateChannelMap.put(channel.getChannelId(), channel); if (updateChannelMap.keySet().size() > 300) { executeSaveForUpdate(); } }else { addChannelMap.put(channel.getChannelId(), channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); } if (addChannelMap.keySet().size() > 300) { executeSaveForAdd(); } } break; case CatalogEvent.DEL: // 删除 logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); deleteChannelList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); } if (deleteChannelList.size() > 300) { executeSaveForDelete(); } break; case CatalogEvent.UPDATE: // 更新 logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); // 判断此通道是否存在 DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); if (deviceChannelForUpdate != null) { channel.setId(deviceChannelForUpdate.getId()); channel.setUpdateTime(DateUtil.getNow()); updateChannelMap.put(channel.getChannelId(), channel); if (updateChannelMap.keySet().size() > 300) { executeSaveForUpdate(); } }else { addChannelMap.put(channel.getChannelId(), channel); if (addChannelMap.keySet().size() > 300) { executeSaveForAdd(); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); } } break; default: logger.warn("[ NotifyCatalog ] event not found : {}", event ); } // 转发变化信息 eventPublisher.catalogEventPublish(null, channel, event); if (!updateChannelMap.keySet().isEmpty() || !addChannelMap.keySet().isEmpty() || !updateChannelOnlineList.isEmpty() || !updateChannelOfflineList.isEmpty() || !deleteChannelList.isEmpty()) { if (!dynamicTask.contains(talkKey)) { dynamicTask.startDelay(talkKey, this::executeSave, 1000); // 遍历DeviceList while (deviceListIterator.hasNext()) { Element itemDevice = deviceListIterator.next(); Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); event = CatalogEvent.ADD; }else { event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); if (channel == null) { logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); continue; } if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { channel.setParentId(null); } channel.setDeviceId(device.getDeviceId()); logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); switch (event) { case CatalogEvent.ON: // 上线 deviceChannelService.online(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); } break; case CatalogEvent.OFF : case CatalogEvent.VLOST: case CatalogEvent.DEFECT: // 离线 if (userSetting.getRefuseChannelStatusChannelFormNotify()) { logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); }else { deviceChannelService.offline(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } } break; case CatalogEvent.DEL: // 删除 deviceChannelService.delete(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); } break; case CatalogEvent.ADD: case CatalogEvent.UPDATE: // 更新 channel.setUpdateTime(DateUtil.getNow()); deviceChannelService.updateChannel(deviceId,channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); } break; default: logger.warn("[ NotifyCatalog ] event not found : {}", event ); } // 转发变化信息 eventPublisher.catalogEventPublish(null, channel, event); } } } catch (DocumentException e) { logger.error("未处理的异常 ", e); } } catch (DocumentException e) { logger.error("未处理的异常 ", e); } } // TODO 同一个通道如果先发送更新再发送离线可能无法正常离线 private void executeSave(){ try { executeSaveForAdd(); } catch (Exception e) { logger.error("[存储收到的增加通道] 异常: ", e ); } try { executeSaveForOnline(); } catch (Exception e) { logger.error("[存储收到的通道上线] 异常: ", e ); } try { executeSaveForOffline(); } catch (Exception e) { logger.error("[存储收到的通道离线] 异常: ", e ); } try { executeSaveForUpdate(); } catch (Exception e) { logger.error("[存储收到的更新通道] 异常: ", e ); } try { executeSaveForDelete(); } catch (Exception e) { logger.error("[存储收到的删除通道] 异常: ", e ); } dynamicTask.stop(talkKey); } private void executeSaveForUpdate(){ if (!updateChannelMap.values().isEmpty()) { logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size()); ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); deviceChannelService.batchUpdateChannel(deviceChannels); updateChannelMap.clear(); } } private void executeSaveForAdd(){ if (!addChannelMap.values().isEmpty()) { ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values()); addChannelMap.clear(); deviceChannelService.batchAddChannel(deviceChannels); } } private void executeSaveForDelete(){ if (!deleteChannelList.isEmpty()) { deviceChannelService.deleteChannels(deleteChannelList); deleteChannelList.clear(); } } private void executeSaveForOnline(){ if (!updateChannelOnlineList.isEmpty()) { deviceChannelService.channelsOnline(updateChannelOnlineList); updateChannelOnlineList.clear(); } } private void executeSaveForOffline(){ if (!updateChannelOfflineList.isEmpty()) { deviceChannelService.channelsOffline(updateChannelOfflineList); updateChannelOfflineList.clear(); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -1,8 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -19,8 +17,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; @@ -29,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; /** * SIP命令类型: NOTIFY请求中的移动位置请求处理 @@ -39,10 +36,6 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList(); @Autowired @@ -57,180 +50,150 @@ @Autowired private IDeviceChannelService deviceChannelService; @Autowired private DynamicTask dynamicTask; @Autowired private CivilCodeFileConf civilCodeFileConf; @Autowired private SipConfig sipConfig; private final static String talkKey = "notify-request-for-mobile-position-task"; @Async("taskExecutor") public void process(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); long startTime = System.currentTimeMillis(); // 回复 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); return; } Device device = redisCatchStorage.getDevice(deviceId); MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); List<Element> elements = rootElement.elements(); String channelId = null; for (Element element : elements) { switch (element.getName()){ case "DeviceID": channelId = element.getStringValue(); if (device == null) { device = redisCatchStorage.getDevice(channelId); if (device == null) { // 根据通道id查询设备Id List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); if (!deviceList.isEmpty()) { device = deviceList.get(0); } } } if (device == null) { logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); return; } mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(channelId); // 兼容设备部分设备上报是通道编号与设备编号一致的情况 if (deviceId.equals(channelId)) { List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); if (deviceChannels.size() == 1) { channelId = deviceChannels.get(0).getChannelId(); } } if (!ObjectUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); } mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(channelId); continue; case "Time": String timeVal = element.getStringValue(); if (ObjectUtils.isEmpty(timeVal)) { mobilePosition.setTime(DateUtil.getNow()); } else { mobilePosition.setTime(SipUtils.parseTime(timeVal)); } continue; case "Longitude": mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); continue; case "Latitude": mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); continue; case "Speed": String speedVal = element.getStringValue(); if (NumericUtil.isDouble(speedVal)) { mobilePosition.setSpeed(Double.parseDouble(speedVal)); } else { mobilePosition.setSpeed(0.0); } continue; case "Direction": String directionVal = element.getStringValue(); if (NumericUtil.isDouble(directionVal)) { mobilePosition.setDirection(Double.parseDouble(directionVal)); } else { mobilePosition.setDirection(0.0); } continue; case "Altitude": String altitudeVal = element.getStringValue(); if (NumericUtil.isDouble(altitudeVal)) { mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); } else { mobilePosition.setAltitude(0.0); } continue; @Transactional public void process(List<RequestEvent> eventList) { if (eventList.isEmpty()) { return; } Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); List<MobilePosition> addMobilePositionList = new ArrayList<>(); for (RequestEvent evt : eventList) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); long startTime = System.currentTimeMillis(); // 回复 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); return; } } Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId); return; } MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setDeviceName(device.getName()); mobilePosition.setCreateTime(DateUtil.getNow()); List<Element> elements = rootElement.elements(); for (Element element : elements) { switch (element.getName()){ case "DeviceID": String channelId = element.getStringValue(); if (!deviceId.equals(channelId)) { mobilePosition.setChannelId(channelId); } continue; case "Time": String timeVal = element.getStringValue(); if (ObjectUtils.isEmpty(timeVal)) { mobilePosition.setTime(DateUtil.getNow()); } else { mobilePosition.setTime(SipUtils.parseTime(timeVal)); } continue; case "Longitude": mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); continue; case "Latitude": mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); continue; case "Speed": String speedVal = element.getStringValue(); if (NumericUtil.isDouble(speedVal)) { mobilePosition.setSpeed(Double.parseDouble(speedVal)); } else { mobilePosition.setSpeed(0.0); } continue; case "Direction": String directionVal = element.getStringValue(); if (NumericUtil.isDouble(directionVal)) { mobilePosition.setDirection(Double.parseDouble(directionVal)); } else { mobilePosition.setDirection(0.0); } continue; case "Altitude": String altitudeVal = element.getStringValue(); if (NumericUtil.isDouble(altitudeVal)) { mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); } else { mobilePosition.setAltitude(0.0); } continue; } } // logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); mobilePosition.setReportSource("Mobile Position"); // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); deviceChannel.setChannelId(channelId); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); updateChannelMap.put(deviceId + channelId, deviceChannel); addMobilePositionList.add(mobilePosition); if(updateChannelMap.size() > 2000) { executeSaveChannel(); } if (userSetting.isSavePositionHistory()) { if(addMobilePositionList.size() > 2000) { executeSaveMobilePosition(); // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); addMobilePositionList.add(mobilePosition); // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 try { eventPublisher.mobilePositionEventPublish(mobilePosition); }catch (Exception e) { logger.error("[向上级转发移动位置失败] ", e); } if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); channels.forEach(channel -> { // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", channel.getDeviceId()); jsonObject.put("code", channel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); }); }else { // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", mobilePosition.getDeviceId()); jsonObject.put("code", mobilePosition.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); } } catch (DocumentException e) { logger.error("未处理的异常 ", e); } // deviceChannel = deviceChannelService.updateGps(deviceChannel, device); // // mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); // mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); // mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); // mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); // deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); if (!dynamicTask.contains(talkKey)) { dynamicTask.startDelay(talkKey, this::executeSave, 3000); } } catch (DocumentException e) { logger.error("未处理的异常 ", e); } } private void executeSave(){ executeSaveChannel(); executeSaveMobilePosition(); dynamicTask.stop(talkKey); } @Async("taskExecutor") public void executeSaveChannel(){ dynamicTask.execute(); try { logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); deviceChannelService.batchUpdateChannelGPS(deviceChannels); if(!updateChannelMap.isEmpty()) { List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); logger.info("[移动位置订阅]更新通道位置: {}", channels.size()); deviceChannelService.batchUpdateChannelGPS(channels); updateChannelMap.clear(); }catch (Exception e) { } } @Async("taskExecutor") public void executeSaveMobilePosition(){ if (userSetting.isSavePositionHistory()) { if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { try { logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); deviceChannelService.batchAddMobilePosition(addMobilePositionList); addMobilePositionList.clear(); }catch (Exception e) { logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); } addMobilePositionList.clear(); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -37,6 +37,7 @@ import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -102,54 +103,62 @@ responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); return; }else { } else { responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); } }catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } boolean runed = !taskQueue.isEmpty(); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { // logger.warn("开始处理"); while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); if (take == null) { continue; } Element rootElement = getRootElement(take.getEvt()); if (rootElement == null) { logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); continue; } String cmd = XmlUtil.getText(rootElement, "CmdType"); if (CmdType.CATALOG.equals(cmd)) { logger.info("接收到Catalog通知"); notifyRequestForCatalogProcessor.process(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("接收到Alarm通知"); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { // logger.info("接收到MobilePosition通知"); // processNotifyMobilePosition(take.getEvt()); // taskExecutor.execute(() -> { notifyRequestForMobilePositionProcessor.process(take.getEvt()); // }); } else { logger.info("接收到消息:" + cmd); } } catch (DocumentException e) { logger.error("处理NOTIFY消息时错误", e); } } @Scheduled(fixedRate = 200) //每200毫秒执行一次 public void executeTaskQueue(){ if (taskQueue.isEmpty()) { return; } try { List<RequestEvent> catalogEventList = new ArrayList<>(); List<RequestEvent> alarmEventList = new ArrayList<>(); List<RequestEvent> mobilePositionEventList = new ArrayList<>(); for (HandlerCatchData take : taskQueue) { if (take == null) { continue; } }); Element rootElement = getRootElement(take.getEvt()); if (rootElement == null) { logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); continue; } String cmd = XmlUtil.getText(rootElement, "CmdType"); if (CmdType.CATALOG.equals(cmd)) { catalogEventList.add(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { alarmEventList.add(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { mobilePositionEventList.add(take.getEvt()); } else { logger.info("接收到消息:" + cmd); } } taskQueue.clear(); if (!alarmEventList.isEmpty()) { processNotifyAlarm(alarmEventList); } if (!catalogEventList.isEmpty()) { notifyRequestForCatalogProcessor.process(catalogEventList); } if (!mobilePositionEventList.isEmpty()) { notifyRequestForMobilePositionProcessor.process(mobilePositionEventList); } } catch (DocumentException e) { logger.error("处理NOTIFY消息时错误", e); } } /** * 处理MobilePosition移动位置Notify @@ -253,95 +262,97 @@ /*** * 处理alarm设备报警Notify * * @param evt */ private void processNotifyAlarm(RequestEvent evt) { private void processNotifyAlarm(List<RequestEvent> evtList) { if (!sipConfig.isAlarm()) { return; } try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); if (!evtList.isEmpty()) { for (RequestEvent evt : evtList) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); return; } Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText().toString(); Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); return; } Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText().toString(); Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); return; } rootElement = getRootElement(evt, device.getCharset()); if (rootElement == null) { logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); return; } DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setDeviceId(deviceId); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); if (alarmTime == null) { logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); return; } deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { deviceAlarm.setAlarmDescription(""); } else { deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); } if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); } else { deviceAlarm.setLongitude(0.00); } if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); } else { deviceAlarm.setLatitude(0.00); } logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); if ("4".equals(deviceAlarm.getAlarmMethod())) { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setChannelId(channelId); mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); return; } rootElement = getRootElement(evt, device.getCharset()); if (rootElement == null) { logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); return; } DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setDeviceId(deviceId); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); if (alarmTime == null) { logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); return; } deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { deviceAlarm.setAlarmDescription(""); } else { deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); } if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); } else { deviceAlarm.setLongitude(0.00); } if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); } else { deviceAlarm.setLatitude(0.00); } logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); if ("4".equals(deviceAlarm.getAlarmMethod())) { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setChannelId(channelId); mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); deviceChannel.setChannelId(channelId); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); deviceChannel.setChannelId(channelId); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); deviceChannel = deviceChannelService.updateGps(deviceChannel, device); deviceChannel = deviceChannelService.updateGps(deviceChannel, device); mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); } // 回复200 OK if (redisCatchStorage.deviceIsOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); } } catch (DocumentException e) { logger.error("未处理的异常 ", e); } } // 回复200 OK if (redisCatchStorage.deviceIsOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); } } catch (DocumentException e) { logger.error("未处理的异常 ", e); } } src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -102,4 +102,9 @@ void batchAddMobilePosition(List<MobilePosition> addMobilePositionList); void online(DeviceChannel channel); void offline(DeviceChannel channel); void delete(DeviceChannel channel); } src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.ArrayList; @@ -248,8 +249,24 @@ } @Override public void online(DeviceChannel channel) { channelMapper.online(channel.getDeviceId(), channel.getChannelId()); } @Override public int channelsOffline(List<DeviceChannel> channels) { return channelMapper.batchOffline(channels); } @Override public void offline(DeviceChannel channel) { channelMapper.offline(channel.getDeviceId(), channel.getChannelId()); } @Override public void delete(DeviceChannel channel) { channelMapper.del(channel.getDeviceId(), channel.getChannelId()); } @Override @@ -355,12 +372,45 @@ } @Override @Transactional public void batchUpdateChannelGPS(List<DeviceChannel> channelList) { channelMapper.batchUpdate(channelList); for (DeviceChannel deviceChannel : channelList) { deviceChannel.setUpdateTime(DateUtil.getNow()); if (deviceChannel.getGpsTime() == null) { deviceChannel.setGpsTime(DateUtil.getNow()); } } int count = 1000; if (channelList.size() > count) { for (int i = 0; i < channelList.size(); i+=count) { int toIndex = i+count; if ( i + count > channelList.size()) { toIndex = channelList.size(); } List<DeviceChannel> channels = channelList.subList(i, toIndex); channelMapper.batchUpdatePosition(channels); } }else { channelMapper.batchUpdatePosition(channelList); } } @Override @Transactional public void batchAddMobilePosition(List<MobilePosition> mobilePositions) { // int count = 500; // if (mobilePositions.size() > count) { // for (int i = 0; i < mobilePositions.size(); i+=count) { // int toIndex = i+count; // if ( i + count > mobilePositions.size()) { // toIndex = mobilePositions.size(); // } // List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex); // deviceMobilePositionMapper.batchadd(mobilePositionsSub); // } // }else { // deviceMobilePositionMapper.batchadd(mobilePositions); // } deviceMobilePositionMapper.batchadd(mobilePositions); } } src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -401,6 +401,23 @@ " </script>"}) int updatePosition(DeviceChannel deviceChannel); @Update({"<script>" + "<foreach collection='deviceChannelList' item='item' separator=';'>" + " UPDATE" + " wvp_device_channel" + " SET gps_time=#{item.gpsTime}" + "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" + "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" + "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" + "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" + "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" + "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" + "WHERE device_id=#{item.deviceId} " + " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" + "</foreach>" + "</script>"}) int batchUpdatePosition(List<DeviceChannel> deviceChannelList); @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List<DeviceChannel> getAllChannelInPlay(); src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -46,6 +46,20 @@ "#{item.createTime}) " + "</foreach> " + "</script>") void batchadd2(List<MobilePosition> mobilePositions); @Insert("<script> " + "<foreach collection='mobilePositions' index='index' item='item' separator=','> " + "insert into wvp_device_mobile_position " + "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," + "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+ "values " + "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " + "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," + "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " + "#{item.createTime}); " + "</foreach> " + "</script>") void batchadd(List<MobilePosition> mobilePositions); } src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -570,7 +570,7 @@ @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString()); // logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString()); redisTemplate.convertAndSend(key, jsonObject); }