From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 204 +++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 180 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 4d26189..6185cda 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,9 +1,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -17,15 +19,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -37,11 +42,14 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); + private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>(); + private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>(); + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private UserSetting userSetting; @@ -56,14 +64,30 @@ private IDeviceChannelService deviceChannelService; @Autowired + private DynamicTask dynamicTask; + + @Autowired private SipConfig sipConfig; @Transactional - public void process(List<RequestEvent> evtList) { - if (evtList.isEmpty()) { + public void process(RequestEvent evt) { + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-鐩綍璁㈤槄] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); return; } - for (RequestEvent evt : evtList) { + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } + + @Scheduled(fixedRate = 400) //姣�400姣鎵ц涓�娆� + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; + } + RequestEvent evt = take.getEvt(); try { long start = System.currentTimeMillis(); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); @@ -71,7 +95,7 @@ Device device = redisCatchStorage.getDevice(deviceId); if (device == null || !device.isOnLine()) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId() : "")); return; } Element rootElement = getRootElement(evt, device.getCharset()); @@ -92,9 +116,9 @@ Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId() : "")); event = CatalogEvent.ADD; - }else { + } else { event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); @@ -106,61 +130,193 @@ channel.setParentId(null); } channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); + logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); switch (event) { case CatalogEvent.ON: // 涓婄嚎 - deviceChannelService.online(channel); + logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + updateChannelOnlineList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 鍙戦�乺edis娑堟伅 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); } - break; - case CatalogEvent.OFF : - case CatalogEvent.VLOST: - case CatalogEvent.DEFECT: + case CatalogEvent.OFF: // 绂荤嚎 + logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鐩綍璁㈤槄] 绂荤嚎 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - deviceChannelService.offline(channel); + logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 鍙戦�乺edis娑堟伅 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } } break; + case CatalogEvent.VLOST: + // 瑙嗛涓㈠け + logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEFECT: + // 鏁呴殰 + logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.ADD: + // 澧炲姞 + logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 + DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannel != null) { + logger.info("[澧炲姞閫氶亾] 宸插瓨鍦紝涓嶅彂閫侀�氱煡鍙洿鏂帮紝璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + channel.setId(deviceChannel.getId()); + channel.setHasAudio(null); + updateChannelMap.put(channel.getChannelId(), channel); + } else { + addChannelMap.put(channel.getChannelId(), channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + } + + break; case CatalogEvent.DEL: // 鍒犻櫎 - deviceChannelService.delete(channel); + logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + deleteChannelList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 鍙戦�乺edis娑堟伅 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); } break; - case CatalogEvent.ADD: case CatalogEvent.UPDATE: // 鏇存柊 - channel.setUpdateTime(DateUtil.getNow()); - channel.setHasAudio(null); - deviceChannelService.updateChannel(deviceId,channel); - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 + DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannelForUpdate != null) { + channel.setId(deviceChannelForUpdate.getId()); + channel.setUpdateTime(DateUtil.getNow()); + channel.setHasAudio(null); + updateChannelMap.put(channel.getChannelId(), channel); + } else { + addChannelMap.put(channel.getChannelId(), channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } } break; default: - logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); + logger.warn("[ NotifyCatalog ] event not found 锛� {}", event); } // 杞彂鍙樺寲淇℃伅 eventPublisher.catalogEventPublish(null, channel, event); } } + } catch (DocumentException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } } + taskQueue.clear(); + if (!updateChannelMap.keySet().isEmpty() + || !addChannelMap.keySet().isEmpty() + || !updateChannelOnlineList.isEmpty() + || !updateChannelOfflineList.isEmpty() + || !deleteChannelList.isEmpty()) { + executeSave(); + } + } + + public void executeSave(){ + try { + executeSaveForAdd(); + } catch (Exception e) { + logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e ); + } + try { + executeSaveForOnline(); + } catch (Exception e) { + logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾涓婄嚎] 寮傚父锛� ", e ); + } + try { + executeSaveForOffline(); + } catch (Exception e) { + logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e ); + } + try { + executeSaveForUpdate(); + } catch (Exception e) { + logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e ); + } + try { + executeSaveForDelete(); + } catch (Exception e) { + logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e ); + } + } + + private void executeSaveForUpdate(){ + if (!updateChannelMap.values().isEmpty()) { + logger.info("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾], 鏁伴噺锛� {}", updateChannelMap.size()); + ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); + deviceChannelService.batchUpdateChannel(deviceChannels); + updateChannelMap.clear(); + } + } + + private void executeSaveForAdd(){ + if (!addChannelMap.values().isEmpty()) { + ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values()); + addChannelMap.clear(); + deviceChannelService.batchAddChannel(deviceChannels); + } + } + + private void executeSaveForDelete(){ + if (!deleteChannelList.isEmpty()) { + deviceChannelService.deleteChannels(deleteChannelList); + deleteChannelList.clear(); + } + } + + private void executeSaveForOnline(){ + if (!updateChannelOnlineList.isEmpty()) { + deviceChannelService.channelsOnline(updateChannelOnlineList); + updateChannelOnlineList.clear(); + } + } + + private void executeSaveForOffline(){ + if (!updateChannelOfflineList.isEmpty()) { + deviceChannelService.channelsOffline(updateChannelOfflineList); + updateChannelOfflineList.clear(); + } + } + + @Scheduled(fixedRate = 10000) //姣�1绉掓墽琛屼竴娆� + public void execute(){ + logger.info("[寰呭鐞哊otify-鐩綍璁㈤槄娑堟伅鏁伴噺]: {}", taskQueue.size()); } } -- Gitblit v1.8.0