From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 72 +++++++++++++++++------------------- 1 files changed, 34 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 05a5336..52fc7a3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -4,12 +4,14 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import org.dom4j.DocumentException; @@ -17,16 +19,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰涓殑绉诲姩浣嶇疆璇锋眰澶勭悊 @@ -37,6 +37,7 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private UserSetting userSetting; @@ -50,14 +51,28 @@ @Autowired private IDeviceChannelService deviceChannelService; - @Transactional - public void process(List<RequestEvent> eventList) { - if (eventList.isEmpty()) { + @Autowired + private IMobilePositionService mobilePositionService; + + public void process(RequestEvent evt) { + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-绉诲姩浣嶇疆] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); return; } - Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); - List<MobilePosition> addMobilePositionList = new ArrayList<>(); - for (RequestEvent evt : eventList) { + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } + + @Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆� + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; + } + RequestEvent evt = take.getEvt(); try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); @@ -128,27 +143,18 @@ } } -// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), -// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); + logger.debug("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); - addMobilePositionList.add(mobilePosition); - - + mobilePositionService.add(mobilePosition); // 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭� try { eventPublisher.mobilePositionEventPublish(mobilePosition); }catch (Exception e) { logger.error("[鍚戜笂绾ц浆鍙戠Щ鍔ㄤ綅缃け璐 ", e); } - if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { + if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId())) { List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); channels.forEach(channel -> { // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� @@ -180,20 +186,10 @@ logger.error("鏈鐞嗙殑寮傚父 ", e); } } - if(!updateChannelMap.isEmpty()) { - List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); - logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size()); - deviceChannelService.batchUpdateChannelGPS(channels); - updateChannelMap.clear(); - } - if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { - try { - logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); - deviceChannelService.batchAddMobilePosition(addMobilePositionList); - }catch (Exception e) { - logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size()); - } - addMobilePositionList.clear(); - } + taskQueue.clear(); + } + @Scheduled(fixedRate = 10000) + public void execute(){ + logger.info("[寰呭鐞哊otify-绉诲姩浣嶇疆璁㈤槄娑堟伅鏁伴噺]: {}", taskQueue.size()); } } -- Gitblit v1.8.0