From d41d6b34af2485198ed01e1888db1571e4da1a6a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 23 四月 2024 20:59:20 +0800 Subject: [PATCH] Merge branch 'refs/heads/2.7.0' --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 363 +++++++++++++-------------------------------------- 1 files changed, 97 insertions(+), 266 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index cd97786..de93804 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -20,10 +18,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,8 +37,6 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); - private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>(); - private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); @@ -60,275 +56,110 @@ private IDeviceChannelService deviceChannelService; @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - - @Autowired private SipConfig sipConfig; - private final static String talkKey = "notify-request-for-catalog-task"; + @Transactional + public void process(List<RequestEvent> evtList) { + if (evtList.isEmpty()) { + return; + } + for (RequestEvent evt : evtList) { + try { + long start = System.currentTimeMillis(); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - public void process(RequestEvent evt) { - try { - long start = System.currentTimeMillis(); - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || !device.isOnLine()) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); + return; + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + return; + } + Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 閬嶅巻DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); - if (channel == null) { - logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent())); - continue; - } - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 涓婄嚎 - logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - updateChannelOnlineList.add(channel); - if (updateChannelOnlineList.size() > 300) { - executeSaveForOnline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); - } - - break; - case CatalogEvent.OFF : - // 绂荤嚎 - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.VLOST: - // 瑙嗛涓㈠け - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.DEFECT: - // 鏁呴殰 - logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.ADD: - // 澧炲姞 - logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 - DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannel != null) { - logger.info("[澧炲姞閫氶亾] 宸插瓨鍦紝涓嶅彂閫侀�氱煡鍙洿鏂帮紝璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - channel.setId(deviceChannel.getId()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - } - - break; - case CatalogEvent.DEL: - // 鍒犻櫎 - logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deleteChannelList.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); - } - if (deleteChannelList.size() > 300) { - executeSaveForDelete(); - } - break; - case CatalogEvent.UPDATE: - // 鏇存柊 - logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 - DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannelForUpdate != null) { - channel.setId(deviceChannelForUpdate.getId()); - channel.setUpdateTime(DateUtil.getNow()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - } - break; - default: - logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); - - } - // 杞彂鍙樺寲淇℃伅 - eventPublisher.catalogEventPublish(null, channel, event); - - if (!updateChannelMap.keySet().isEmpty() - || !addChannelMap.keySet().isEmpty() - || !updateChannelOnlineList.isEmpty() - || !updateChannelOfflineList.isEmpty() - || !deleteChannelList.isEmpty()) { - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + // 閬嶅巻DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); } + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); + if (channel == null) { + logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + channel.setDeviceId(device.getDeviceId()); + logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); + switch (event) { + case CatalogEvent.ON: + // 涓婄嚎 + deviceChannelService.online(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); + } + + break; + case CatalogEvent.OFF : + case CatalogEvent.VLOST: + case CatalogEvent.DEFECT: + // 绂荤嚎 + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鐩綍璁㈤槄] 绂荤嚎 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { + deviceChannelService.offline(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEL: + // 鍒犻櫎 + deviceChannelService.delete(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); + } + break; + case CatalogEvent.ADD: + case CatalogEvent.UPDATE: + // 鏇存柊 + channel.setUpdateTime(DateUtil.getNow()); + deviceChannelService.updateChannel(deviceId,channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + break; + default: + logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); + + } + // 杞彂鍙樺寲淇℃伅 + eventPublisher.catalogEventPublish(null, channel, event); } } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); } } - - private void executeSave(){ - try { - executeSaveForAdd(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForOnline(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾涓婄嚎] 寮傚父锛� ", e ); - } - try { - executeSaveForOffline(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e ); - } - dynamicTask.stop(talkKey); - } - - private void executeSaveForUpdate(){ - if (!updateChannelMap.values().isEmpty()) { - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); - updateChannelMap.clear(); - deviceChannelService.batchUpdateChannel(deviceChannels); - } - - } - - private void executeSaveForAdd(){ - if (!addChannelMap.values().isEmpty()) { - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values()); - addChannelMap.clear(); - deviceChannelService.batchAddChannel(deviceChannels); - } - } - - private void executeSaveForDelete(){ - if (!deleteChannelList.isEmpty()) { - deviceChannelService.deleteChannels(deleteChannelList); - deleteChannelList.clear(); - } - } - - private void executeSaveForOnline(){ - if (!updateChannelOnlineList.isEmpty()) { - deviceChannelService.channelsOnline(updateChannelOnlineList); - updateChannelOnlineList.clear(); - } - } - - private void executeSaveForOffline(){ - if (!updateChannelOfflineList.isEmpty()) { - deviceChannelService.channelsOffline(updateChannelOfflineList); - updateChannelOfflineList.clear(); - } - } - } -- Gitblit v1.8.0