From 66a681d67969ae64c5cfe7140cb60a7885edef86 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 23 四月 2024 20:45:12 +0800 Subject: [PATCH] 优化notify消息中目录和移动位置的处理 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 362 +++++--------------- src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java | 5 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 2 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java | 52 +++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 297 +++++++--------- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 243 +++++++------ src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java | 14 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 17 + 8 files changed, 442 insertions(+), 550 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index c80cc88..de93804 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -20,10 +18,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,8 +37,6 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); - private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>(); - private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); @@ -60,274 +56,110 @@ private IDeviceChannelService deviceChannelService; @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - - @Autowired private SipConfig sipConfig; - private final static String talkKey = "notify-request-for-catalog-task"; + @Transactional + public void process(List<RequestEvent> evtList) { + if (evtList.isEmpty()) { + return; + } + for (RequestEvent evt : evtList) { + try { + long start = System.currentTimeMillis(); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - public void process(RequestEvent evt) { - try { - long start = System.currentTimeMillis(); - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || !device.isOnLine()) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); + return; + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + return; + } + Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 閬嶅巻DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); - if (channel == null) { - logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent())); - continue; - } - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 涓婄嚎 - logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - updateChannelOnlineList.add(channel); - if (updateChannelOnlineList.size() > 300) { - executeSaveForOnline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); - } - - break; - case CatalogEvent.OFF : - // 绂荤嚎 - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.VLOST: - // 瑙嗛涓㈠け - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.DEFECT: - // 鏁呴殰 - logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.ADD: - // 澧炲姞 - logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 - DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannel != null) { - logger.info("[澧炲姞閫氶亾] 宸插瓨鍦紝涓嶅彂閫侀�氱煡鍙洿鏂帮紝璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - channel.setId(deviceChannel.getId()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - } - - break; - case CatalogEvent.DEL: - // 鍒犻櫎 - logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deleteChannelList.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); - } - if (deleteChannelList.size() > 300) { - executeSaveForDelete(); - } - break; - case CatalogEvent.UPDATE: - // 鏇存柊 - logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 - DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannelForUpdate != null) { - channel.setId(deviceChannelForUpdate.getId()); - channel.setUpdateTime(DateUtil.getNow()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - } - break; - default: - logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); - - } - // 杞彂鍙樺寲淇℃伅 - eventPublisher.catalogEventPublish(null, channel, event); - - if (!updateChannelMap.keySet().isEmpty() - || !addChannelMap.keySet().isEmpty() - || !updateChannelOnlineList.isEmpty() - || !updateChannelOfflineList.isEmpty() - || !deleteChannelList.isEmpty()) { - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + // 閬嶅巻DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); } + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); + if (channel == null) { + logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + channel.setDeviceId(device.getDeviceId()); + logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); + switch (event) { + case CatalogEvent.ON: + // 涓婄嚎 + deviceChannelService.online(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); + } + + break; + case CatalogEvent.OFF : + case CatalogEvent.VLOST: + case CatalogEvent.DEFECT: + // 绂荤嚎 + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鐩綍璁㈤槄] 绂荤嚎 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { + deviceChannelService.offline(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEL: + // 鍒犻櫎 + deviceChannelService.delete(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); + } + break; + case CatalogEvent.ADD: + case CatalogEvent.UPDATE: + // 鏇存柊 + channel.setUpdateTime(DateUtil.getNow()); + deviceChannelService.updateChannel(deviceId,channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + break; + default: + logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); + + } + // 杞彂鍙樺寲淇℃伅 + eventPublisher.catalogEventPublish(null, channel, event); } } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); } } - - // TODO 鍚屼竴涓�氶亾濡傛灉鍏堝彂閫佹洿鏂板啀鍙戦�佺绾垮彲鑳芥棤娉曟甯哥绾� - private void executeSave(){ - try { - executeSaveForAdd(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForOnline(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾涓婄嚎] 寮傚父锛� ", e ); - } - try { - executeSaveForOffline(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e ); - } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e ); - } - - dynamicTask.stop(talkKey); - } - - private void executeSaveForUpdate(){ - if (!updateChannelMap.values().isEmpty()) { - logger.info("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾], 鏁伴噺锛� {}", updateChannelMap.size()); - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannel(deviceChannels); - updateChannelMap.clear(); - } - - } - - private void executeSaveForAdd(){ - if (!addChannelMap.values().isEmpty()) { - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values()); - addChannelMap.clear(); - deviceChannelService.batchAddChannel(deviceChannels); - } - } - - private void executeSaveForDelete(){ - if (!deleteChannelList.isEmpty()) { - deviceChannelService.deleteChannels(deleteChannelList); - deleteChannelList.clear(); - } - } - - private void executeSaveForOnline(){ - if (!updateChannelOnlineList.isEmpty()) { - deviceChannelService.channelsOnline(updateChannelOnlineList); - updateChannelOnlineList.clear(); - } - } - - private void executeSaveForOffline(){ - if (!updateChannelOfflineList.isEmpty()) { - deviceChannelService.channelsOffline(updateChannelOfflineList); - updateChannelOfflineList.clear(); - } - } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 1b7daf0..05a5336 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -1,8 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -19,8 +17,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; @@ -29,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; /** * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰涓殑绉诲姩浣嶇疆璇锋眰澶勭悊 @@ -39,10 +36,6 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); - - private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); - - private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList(); @Autowired @@ -57,180 +50,150 @@ @Autowired private IDeviceChannelService deviceChannelService; - @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - - @Autowired - private SipConfig sipConfig; - - private final static String talkKey = "notify-request-for-mobile-position-task"; - - @Async("taskExecutor") - public void process(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - long startTime = System.currentTimeMillis(); - // 鍥炲 200 OK - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); - return; - } - Device device = redisCatchStorage.getDevice(deviceId); - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - List<Element> elements = rootElement.elements(); - String channelId = null; - for (Element element : elements) { - switch (element.getName()){ - case "DeviceID": - channelId = element.getStringValue(); - if (device == null) { - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - // 鏍规嵁閫氶亾id鏌ヨ璁惧Id - List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (!deviceList.isEmpty()) { - device = deviceList.get(0); - } - } - } - if (device == null) { - logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); - return; - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌 - if (deviceId.equals(channelId)) { - List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); - } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - continue; - case "Time": - String timeVal = element.getStringValue(); - if (ObjectUtils.isEmpty(timeVal)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(timeVal)); - } - continue; - case "Longitude": - mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); - continue; - case "Latitude": - mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); - continue; - case "Speed": - String speedVal = element.getStringValue(); - if (NumericUtil.isDouble(speedVal)) { - mobilePosition.setSpeed(Double.parseDouble(speedVal)); - } else { - mobilePosition.setSpeed(0.0); - } - continue; - case "Direction": - String directionVal = element.getStringValue(); - if (NumericUtil.isDouble(directionVal)) { - mobilePosition.setDirection(Double.parseDouble(directionVal)); - } else { - mobilePosition.setDirection(0.0); - } - continue; - case "Altitude": - String altitudeVal = element.getStringValue(); - if (NumericUtil.isDouble(altitudeVal)) { - mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); - } else { - mobilePosition.setAltitude(0.0); - } - continue; - + @Transactional + public void process(List<RequestEvent> eventList) { + if (eventList.isEmpty()) { + return; + } + Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); + List<MobilePosition> addMobilePositionList = new ArrayList<>(); + for (RequestEvent evt : eventList) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + long startTime = System.currentTimeMillis(); + // 鍥炲 200 OK + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); + return; } - } + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒癲evice,{}", deviceId); + return; + } + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setDeviceName(device.getName()); + mobilePosition.setCreateTime(DateUtil.getNow()); + List<Element> elements = rootElement.elements(); + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + String channelId = element.getStringValue(); + if (!deviceId.equals(channelId)) { + mobilePosition.setChannelId(channelId); + } + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; + + } + } // logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); - mobilePosition.setReportSource("Mobile Position"); + mobilePosition.setReportSource("Mobile Position"); - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(deviceId + channelId, deviceChannel); - addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 2000) { - executeSaveChannel(); - } - if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 2000) { - executeSaveMobilePosition(); + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); + addMobilePositionList.add(mobilePosition); + + + // 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭� + try { + eventPublisher.mobilePositionEventPublish(mobilePosition); + }catch (Exception e) { + logger.error("[鍚戜笂绾ц浆鍙戠Щ鍔ㄤ綅缃け璐 ", e); } + if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { + List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); + channels.forEach(channel -> { + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", channel.getDeviceId()); + jsonObject.put("code", channel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + }); + }else { + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", mobilePosition.getDeviceId()); + jsonObject.put("code", mobilePosition.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - -// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); -// -// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); -// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); -// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); -// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - -// deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 3000); - } - - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); } - - - } - - private void executeSave(){ - executeSaveChannel(); - executeSaveMobilePosition(); - dynamicTask.stop(talkKey); - } - - @Async("taskExecutor") - public void executeSaveChannel(){ - dynamicTask.execute(); - try { - logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", updateChannelMap.size()); - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannelGPS(deviceChannels); + if(!updateChannelMap.isEmpty()) { + List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); + logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size()); + deviceChannelService.batchUpdateChannelGPS(channels); updateChannelMap.clear(); - }catch (Exception e) { - } - } - @Async("taskExecutor") - public void executeSaveMobilePosition(){ - if (userSetting.isSavePositionHistory()) { + if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { try { logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); deviceChannelService.batchAddMobilePosition(addMobilePositionList); - addMobilePositionList.clear(); }catch (Exception e) { logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size()); } + addMobilePositionList.clear(); } } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index da348b7..8a42624 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -37,6 +37,7 @@ import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -102,54 +103,62 @@ responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); return; - }else { + } else { responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); } - }catch (SipException | InvalidArgumentException | ParseException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } boolean runed = !taskQueue.isEmpty(); taskQueue.offer(new HandlerCatchData(evt, null, null)); - if (!runed) { - taskExecutor.execute(()-> { -// logger.warn("寮�濮嬪鐞�"); - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - if (take == null) { - continue; - } - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - notifyRequestForCatalogProcessor.process(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("鎺ユ敹鍒癆larm閫氱煡"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { -// logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); -// processNotifyMobilePosition(take.getEvt()); -// taskExecutor.execute(() -> { - notifyRequestForMobilePositionProcessor.process(take.getEvt()); -// }); - - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - } - } catch (DocumentException e) { - logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); - } + } + @Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆� + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + try { + List<RequestEvent> catalogEventList = new ArrayList<>(); + List<RequestEvent> alarmEventList = new ArrayList<>(); + List<RequestEvent> mobilePositionEventList = new ArrayList<>(); + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; } - }); + Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); + continue; + } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + catalogEventList.add(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + alarmEventList.add(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + mobilePositionEventList.add(take.getEvt()); + } else { + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); + } + } + taskQueue.clear(); + if (!alarmEventList.isEmpty()) { + processNotifyAlarm(alarmEventList); + } + if (!catalogEventList.isEmpty()) { + notifyRequestForCatalogProcessor.process(catalogEventList); + } + if (!mobilePositionEventList.isEmpty()) { + notifyRequestForMobilePositionProcessor.process(mobilePositionEventList); + } + } catch (DocumentException e) { + logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); } } + + /** * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify @@ -253,95 +262,97 @@ /*** * 澶勭悊alarm璁惧鎶ヨNotify - * - * @param evt */ - private void processNotifyAlarm(RequestEvent evt) { + private void processNotifyAlarm(List<RequestEvent> evtList) { if (!sipConfig.isAlarm()) { return; } - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + if (!evtList.isEmpty()) { + for (RequestEvent evt : evtList) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); - return; - } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); + return; + } + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { - logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId); - return; - } - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); - return; - } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setDeviceId(deviceId); - deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); - deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); - if (alarmTime == null) { - logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); - return; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { - deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - } else { - deviceAlarm.setLongitude(0.00); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { - deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - } else { - deviceAlarm.setLatitude(0.00); - } - logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if ("4".equals(deviceAlarm.getAlarmMethod())) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setChannelId(channelId); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId); + return; + } + rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); + if ("4".equals(deviceAlarm.getAlarmMethod())) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + } + + // 鍥炲200 OK + if (redisCatchStorage.deviceIsOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } } - - // 鍥炲200 OK - if (redisCatchStorage.deviceIsOnline(deviceId)) { - publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index 7dd04e8..a8cd581 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -102,4 +102,9 @@ void batchAddMobilePosition(List<MobilePosition> addMobilePositionList); + void online(DeviceChannel channel); + + void offline(DeviceChannel channel); + + void delete(DeviceChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index f766640..67e34a0 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.ArrayList; @@ -248,8 +249,24 @@ } @Override + public void online(DeviceChannel channel) { + channelMapper.online(channel.getDeviceId(), channel.getChannelId()); + } + + @Override public int channelsOffline(List<DeviceChannel> channels) { return channelMapper.batchOffline(channels); + } + + + @Override + public void offline(DeviceChannel channel) { + channelMapper.offline(channel.getDeviceId(), channel.getChannelId()); + } + + @Override + public void delete(DeviceChannel channel) { + channelMapper.del(channel.getDeviceId(), channel.getChannelId()); } @Override @@ -355,12 +372,45 @@ } @Override + @Transactional public void batchUpdateChannelGPS(List<DeviceChannel> channelList) { - channelMapper.batchUpdate(channelList); + for (DeviceChannel deviceChannel : channelList) { + deviceChannel.setUpdateTime(DateUtil.getNow()); + if (deviceChannel.getGpsTime() == null) { + deviceChannel.setGpsTime(DateUtil.getNow()); + } + } + int count = 1000; + if (channelList.size() > count) { + for (int i = 0; i < channelList.size(); i+=count) { + int toIndex = i+count; + if ( i + count > channelList.size()) { + toIndex = channelList.size(); + } + List<DeviceChannel> channels = channelList.subList(i, toIndex); + channelMapper.batchUpdatePosition(channels); + } + }else { + channelMapper.batchUpdatePosition(channelList); + } } @Override + @Transactional public void batchAddMobilePosition(List<MobilePosition> mobilePositions) { +// int count = 500; +// if (mobilePositions.size() > count) { +// for (int i = 0; i < mobilePositions.size(); i+=count) { +// int toIndex = i+count; +// if ( i + count > mobilePositions.size()) { +// toIndex = mobilePositions.size(); +// } +// List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex); +// deviceMobilePositionMapper.batchadd(mobilePositionsSub); +// } +// }else { +// deviceMobilePositionMapper.batchadd(mobilePositions); +// } deviceMobilePositionMapper.batchadd(mobilePositions); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index ae22336..10d0ee1 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -401,6 +401,23 @@ " </script>"}) int updatePosition(DeviceChannel deviceChannel); + @Update({"<script>" + + "<foreach collection='deviceChannelList' item='item' separator=';'>" + + " UPDATE" + + " wvp_device_channel" + + " SET gps_time=#{item.gpsTime}" + + "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" + + "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" + + "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" + + "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" + + "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" + + "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" + + "WHERE device_id=#{item.deviceId} " + + " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" + + "</foreach>" + + "</script>"}) + int batchUpdatePosition(List<DeviceChannel> deviceChannelList); + @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List<DeviceChannel> getAllChannelInPlay(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index e3d8982..c28b16e 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -46,6 +46,20 @@ "#{item.createTime}) " + "</foreach> " + "</script>") + void batchadd2(List<MobilePosition> mobilePositions); + + @Insert("<script> " + + "<foreach collection='mobilePositions' index='index' item='item' separator=','> " + + "insert into wvp_device_mobile_position " + + "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," + + "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+ + "values " + + "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " + + "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," + + "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " + + "#{item.createTime}); " + + "</foreach> " + + "</script>") void batchadd(List<MobilePosition> mobilePositions); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index e42ea68..8b9cc07 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -570,7 +570,7 @@ @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; - logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString()); +// logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString()); redisTemplate.convertAndSend(key, jsonObject); } -- Gitblit v1.8.0