From d41d6b34af2485198ed01e1888db1571e4da1a6a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 23 四月 2024 20:59:20 +0800
Subject: [PATCH] Merge branch 'refs/heads/2.7.0'

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java |  363 +++++++++++++--------------------------------------
 1 files changed, 97 insertions(+), 266 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
index cd97786..de93804 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,7 +1,5 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
-import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -20,10 +18,10 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.sip.RequestEvent;
 import javax.sip.header.FromHeader;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,8 +37,6 @@
 
     private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
 
-	private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
-	private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
 	private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
 
 	private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
@@ -60,275 +56,110 @@
 	private IDeviceChannelService deviceChannelService;
 
 	@Autowired
-	private DynamicTask dynamicTask;
-
-	@Autowired
-	private CivilCodeFileConf civilCodeFileConf;
-
-	@Autowired
 	private SipConfig sipConfig;
 
-	private final static String talkKey = "notify-request-for-catalog-task";
+	@Transactional
+	public void process(List<RequestEvent> evtList) {
+		if (evtList.isEmpty()) {
+			return;
+		}
+		for (RequestEvent evt : evtList) {
+			try {
+				long start = System.currentTimeMillis();
+				FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+				String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
 
-	public void process(RequestEvent evt) {
-		try {
-			long start = System.currentTimeMillis();
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+				Device device = redisCatchStorage.getDevice(deviceId);
+				if (device == null || !device.isOnLine()) {
+					logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" ));
+					return;
+				}
+				Element rootElement = getRootElement(evt, device.getCharset());
+				if (rootElement == null) {
+					logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest());
+					return;
+				}
+				Element deviceListElement = rootElement.element("DeviceList");
+				if (deviceListElement == null) {
+					return;
+				}
+				Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
+				if (deviceListIterator != null) {
 
-			Device device = redisCatchStorage.getDevice(deviceId);
-			if (device == null || !device.isOnLine()) {
-				logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" ));
-				return;
-			}
-			Element rootElement = getRootElement(evt, device.getCharset());
-			if (rootElement == null) {
-				logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest());
-				return;
-			}
-			Element deviceListElement = rootElement.element("DeviceList");
-			if (deviceListElement == null) {
-				return;
-			}
-			Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
-			if (deviceListIterator != null) {
-
-				// 閬嶅巻DeviceList
-				while (deviceListIterator.hasNext()) {
-					Element itemDevice = deviceListIterator.next();
-					Element channelDeviceElement = itemDevice.element("DeviceID");
-					if (channelDeviceElement == null) {
-						continue;
-					}
-					Element eventElement = itemDevice.element("Event");
-					String event;
-					if (eventElement == null) {
-						logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" ));
-						event = CatalogEvent.ADD;
-					}else {
-						event = eventElement.getText().toUpperCase();
-					}
-					DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
-					if (channel == null) {
-						logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent()));
-						continue;
-					}
-					if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
-						channel.setParentId(null);
-					}
-					channel.setDeviceId(device.getDeviceId());
-					logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId());
-					switch (event) {
-						case CatalogEvent.ON:
-							// 涓婄嚎
-							logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							updateChannelOnlineList.add(channel);
-							if (updateChannelOnlineList.size() > 300) {
-								executeSaveForOnline();
-							}
-							if (userSetting.getDeviceStatusNotify()) {
-								// 鍙戦�乺edis娑堟伅
-								redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
-							}
-
-							break;
-						case CatalogEvent.OFF :
-							// 绂荤嚎
-							logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.VLOST:
-							// 瑙嗛涓㈠け
-							logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.DEFECT:
-							// 鏁呴殰
-							logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.ADD:
-							// 澧炲姞
-							logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							// 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪
-							DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
-							if (deviceChannel != null) {
-								logger.info("[澧炲姞閫氶亾] 宸插瓨鍦紝涓嶅彂閫侀�氱煡鍙洿鏂帮紝璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-								channel.setId(deviceChannel.getId());
-								updateChannelMap.put(channel.getChannelId(), channel);
-								if (updateChannelMap.keySet().size() > 300) {
-									executeSaveForUpdate();
-								}
-							}else {
-								addChannelMap.put(channel.getChannelId(), channel);
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
-								}
-
-								if (addChannelMap.keySet().size() > 300) {
-									executeSaveForAdd();
-								}
-							}
-
-							break;
-						case CatalogEvent.DEL:
-							// 鍒犻櫎
-							logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							deleteChannelList.add(channel);
-							if (userSetting.getDeviceStatusNotify()) {
-								// 鍙戦�乺edis娑堟伅
-								redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
-							}
-							if (deleteChannelList.size() > 300) {
-								executeSaveForDelete();
-							}
-							break;
-						case CatalogEvent.UPDATE:
-							// 鏇存柊
-							logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							// 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪
-							DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
-							if (deviceChannelForUpdate != null) {
-								channel.setId(deviceChannelForUpdate.getId());
-								channel.setUpdateTime(DateUtil.getNow());
-								updateChannelMap.put(channel.getChannelId(), channel);
-								if (updateChannelMap.keySet().size() > 300) {
-									executeSaveForUpdate();
-								}
-							}else {
-								addChannelMap.put(channel.getChannelId(), channel);
-								if (addChannelMap.keySet().size() > 300) {
-									executeSaveForAdd();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
-								}
-							}
-							break;
-						default:
-							logger.warn("[ NotifyCatalog ] event not found 锛� {}", event );
-
-					}
-					// 杞彂鍙樺寲淇℃伅
-					eventPublisher.catalogEventPublish(null, channel, event);
-
-					if (!updateChannelMap.keySet().isEmpty()
-							|| !addChannelMap.keySet().isEmpty()
-							|| !updateChannelOnlineList.isEmpty()
-							|| !updateChannelOfflineList.isEmpty()
-							|| !deleteChannelList.isEmpty()) {
-
-						if (!dynamicTask.contains(talkKey)) {
-							dynamicTask.startDelay(talkKey, this::executeSave, 1000);
+					// 閬嶅巻DeviceList
+					while (deviceListIterator.hasNext()) {
+						Element itemDevice = deviceListIterator.next();
+						Element eventElement = itemDevice.element("Event");
+						String event;
+						if (eventElement == null) {
+							logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" ));
+							event = CatalogEvent.ADD;
+						}else {
+							event = eventElement.getText().toUpperCase();
 						}
+						DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
+						if (channel == null) {
+							logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent()));
+							continue;
+						}
+						if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
+							channel.setParentId(null);
+						}
+						channel.setDeviceId(device.getDeviceId());
+						logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}, {}/{}",event, device.getDeviceId(), channel.getChannelId());
+						switch (event) {
+							case CatalogEvent.ON:
+								// 涓婄嚎
+								deviceChannelService.online(channel);
+								if (userSetting.getDeviceStatusNotify()) {
+									// 鍙戦�乺edis娑堟伅
+									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
+								}
+
+								break;
+							case CatalogEvent.OFF :
+							case CatalogEvent.VLOST:
+							case CatalogEvent.DEFECT:
+								// 绂荤嚎
+								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
+									logger.info("[鐩綍璁㈤槄] 绂荤嚎 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
+								}else {
+									deviceChannelService.offline(channel);
+									if (userSetting.getDeviceStatusNotify()) {
+										// 鍙戦�乺edis娑堟伅
+										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+									}
+								}
+								break;
+							case CatalogEvent.DEL:
+								// 鍒犻櫎
+								deviceChannelService.delete(channel);
+								if (userSetting.getDeviceStatusNotify()) {
+									// 鍙戦�乺edis娑堟伅
+									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
+								}
+								break;
+							case CatalogEvent.ADD:
+							case CatalogEvent.UPDATE:
+								// 鏇存柊
+								channel.setUpdateTime(DateUtil.getNow());
+								deviceChannelService.updateChannel(deviceId,channel);
+								if (userSetting.getDeviceStatusNotify()) {
+									// 鍙戦�乺edis娑堟伅
+									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
+								}
+								break;
+							default:
+								logger.warn("[ NotifyCatalog ] event not found 锛� {}", event );
+
+						}
+						// 杞彂鍙樺寲淇℃伅
+						eventPublisher.catalogEventPublish(null, channel, event);
 					}
 				}
+			} catch (DocumentException e) {
+				logger.error("鏈鐞嗙殑寮傚父 ", e);
 			}
-		} catch (DocumentException e) {
-			logger.error("鏈鐞嗙殑寮傚父 ", e);
 		}
 	}
-
-	private void executeSave(){
-		try {
-			executeSaveForAdd();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e );
-		}
-		try {
-			executeSaveForUpdate();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e );
-		}
-		try {
-			executeSaveForDelete();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e );
-		}
-		try {
-			executeSaveForOnline();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾涓婄嚎] 寮傚父锛� ", e );
-		}
-		try {
-			executeSaveForOffline();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e );
-		}
-		dynamicTask.stop(talkKey);
-	}
-
-	private void executeSaveForUpdate(){
-		if (!updateChannelMap.values().isEmpty()) {
-			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
-			updateChannelMap.clear();
-			deviceChannelService.batchUpdateChannel(deviceChannels);
-		}
-
-	}
-
-	private void executeSaveForAdd(){
-		if (!addChannelMap.values().isEmpty()) {
-			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
-			addChannelMap.clear();
-			deviceChannelService.batchAddChannel(deviceChannels);
-		}
-	}
-
-	private void executeSaveForDelete(){
-		if (!deleteChannelList.isEmpty()) {
-			deviceChannelService.deleteChannels(deleteChannelList);
-			deleteChannelList.clear();
-		}
-	}
-
-	private void executeSaveForOnline(){
-		if (!updateChannelOnlineList.isEmpty()) {
-			deviceChannelService.channelsOnline(updateChannelOnlineList);
-			updateChannelOnlineList.clear();
-		}
-	}
-
-	private void executeSaveForOffline(){
-		if (!updateChannelOfflineList.isEmpty()) {
-			deviceChannelService.channelsOffline(updateChannelOfflineList);
-			updateChannelOfflineList.clear();
-		}
-	}
-
 }

--
Gitblit v1.8.0