From 30ae9e929fad80f624ab632c53081db3d2dc9aec Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 25 五月 2023 17:28:57 +0800 Subject: [PATCH] 合并主线 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 281 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 281 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java new file mode 100644 index 0000000..b9a41a5 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -0,0 +1,281 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.RequestEvent; +import javax.sip.header.FromHeader; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰涓殑鐩綍璇锋眰澶勭悊 + */ +@Component +public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent { + + + private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); + + private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>(); + private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>(); + private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); + + private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); + private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>(); + + + @Autowired + private UserSetting userSetting; + + @Autowired + private EventPublisher eventPublisher; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IDeviceChannelService deviceChannelService; + + @Autowired + private DynamicTask dynamicTask; + + private final static String talkKey = "notify-request-for-catalog-task"; + + public void process(RequestEvent evt) { + try { + long start = System.currentTimeMillis(); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || device.getOnline() == 0) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); + return; + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + return; + } + Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + + // 閬嶅巻DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); + } + DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event); + + channel.setDeviceId(device.getDeviceId()); + logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); + switch (event) { + case CatalogEvent.ON: + // 涓婄嚎 + logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + updateChannelOnlineList.add(channel); + if (updateChannelOnlineList.size() > 300) { + executeSaveForOnline(); + } + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); + } + + break; + case CatalogEvent.OFF : + // 绂荤嚎 + logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { + updateChannelOfflineList.add(channel); + if (updateChannelOfflineList.size() > 300) { + executeSaveForOffline(); + } + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.VLOST: + // 瑙嗛涓㈠け + logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { + updateChannelOfflineList.add(channel); + if (updateChannelOfflineList.size() > 300) { + executeSaveForOffline(); + } + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEFECT: + // 鏁呴殰 + logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { + updateChannelOfflineList.add(channel); + if (updateChannelOfflineList.size() > 300) { + executeSaveForOffline(); + } + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.ADD: + // 澧炲姞 + logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 + DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannel != null) { + channel.setId(deviceChannel.getId()); + updateChannelMap.put(channel.getChannelId(), channel); + if (updateChannelMap.keySet().size() > 300) { + executeSaveForUpdate(); + } + }else { + addChannelMap.put(channel.getChannelId(), channel); + if (addChannelMap.keySet().size() > 300) { + executeSaveForAdd(); + } + } + + break; + case CatalogEvent.DEL: + // 鍒犻櫎 + logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + deleteChannelList.add(channel); + if (deleteChannelList.size() > 300) { + executeSaveForDelete(); + } + break; + case CatalogEvent.UPDATE: + // 鏇存柊 + logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 + DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannelForUpdate != null) { + channel.setId(deviceChannelForUpdate.getId()); + updateChannelMap.put(channel.getChannelId(), channel); + if (updateChannelMap.keySet().size() > 300) { + executeSaveForUpdate(); + } + }else { + addChannelMap.put(channel.getChannelId(), channel); + if (addChannelMap.keySet().size() > 300) { + executeSaveForAdd(); + } + } + break; + default: + logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); + + } + // 杞彂鍙樺寲淇℃伅 + eventPublisher.catalogEventPublish(null, channel, event); + + if (updateChannelMap.keySet().size() > 0 + || addChannelMap.keySet().size() > 0 + || updateChannelOnlineList.size() > 0 + || updateChannelOfflineList.size() > 0 + || deleteChannelList.size() > 0) { + + if (!dynamicTask.contains(talkKey)) { + dynamicTask.startDelay(talkKey, this::executeSave, 1000); + } + } + } + } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + } + + private void executeSave(){ + System.out.println("瀹氭椂瀛樺偍鏁版嵁"); + executeSaveForUpdate(); + executeSaveForDelete(); + executeSaveForOnline(); + executeSaveForOffline(); + dynamicTask.stop(talkKey); + } + + private void executeSaveForUpdate(){ + if (updateChannelMap.values().size() > 0) { + ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); + updateChannelMap.clear(); + deviceChannelService.batchUpdateChannel(deviceChannels); + } + + } + + private void executeSaveForAdd(){ + if (addChannelMap.values().size() > 0) { + ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values()); + addChannelMap.clear(); + deviceChannelService.batchAddChannel(deviceChannels); + } + } + + private void executeSaveForDelete(){ + if (deleteChannelList.size() > 0) { + deviceChannelService.deleteChannels(deleteChannelList); + deleteChannelList.clear(); + } + } + + private void executeSaveForOnline(){ + if (updateChannelOnlineList.size() > 0) { + deviceChannelService.channelsOnline(updateChannelOnlineList); + updateChannelOnlineList.clear(); + } + } + + private void executeSaveForOffline(){ + if (updateChannelOfflineList.size() > 0) { + deviceChannelService.channelsOffline(updateChannelOfflineList); + updateChannelOfflineList.clear(); + } + } + +} -- Gitblit v1.8.0