From 588b1da35a1a51ddfca76fb8ca9c2c0c6cd70038 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 13 五月 2024 11:12:20 +0800 Subject: [PATCH] Merge branch 'refs/heads/2.7.0' into 271-优化notify存储 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 294 ++++++++++++++++++++++++++++++---------------------------- 1 files changed, 151 insertions(+), 143 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 5157506..013d95e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -1,17 +1,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -20,7 +18,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; @@ -29,7 +29,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰涓殑绉诲姩浣嶇疆璇锋眰澶勭悊 @@ -40,10 +40,7 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); - private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); - - private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList(); - + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private UserSetting userSetting; @@ -57,158 +54,169 @@ @Autowired private IDeviceChannelService deviceChannelService; - @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - - @Autowired - private SipConfig sipConfig; - - private final static String talkKey = "notify-request-for-mobile-position-task"; - public void process(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - // 鍥炲 200 OK - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); - return; + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-绉诲姩浣嶇疆] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } + + @Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆� + @Transactional + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); + List<MobilePosition> addMobilePositionList = new ArrayList<>(); + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; } - - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getTextTrim().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - - if (device == null) { - device = redisCatchStorage.getDevice(channelId); + RequestEvent evt = take.getEvt(); + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + long startTime = System.currentTimeMillis(); + // 鍥炲 200 OK + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); + return; + } + Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { - // 鏍规嵁閫氶亾id鏌ヨ璁惧Id - List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒癲evice,{}", deviceId); + return; + } + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setDeviceName(device.getName()); + mobilePosition.setCreateTime(DateUtil.getNow()); + List<Element> elements = rootElement.elements(); + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + String channelId = element.getStringValue(); + if (!deviceId.equals(channelId)) { + mobilePosition.setChannelId(channelId); + } + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; + } } - } - if (device == null) { - logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); - return; - } - // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌 - if (deviceId.equals(channelId)) { - List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); + +// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); + mobilePosition.setReportSource("Mobile Position"); + + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); + addMobilePositionList.add(mobilePosition); + + + // 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭� + try { + eventPublisher.mobilePositionEventPublish(mobilePosition); + }catch (Exception e) { + logger.error("[鍚戜笂绾ц浆鍙戠Щ鍔ㄤ綅缃け璐 ", e); } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceName(device.getName()); - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - String time = XmlUtil.getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - - mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); - mobilePosition.setReportSource("Mobile Position"); - - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(channelId, deviceChannel); - addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 300) { - executeSaveChannel(); - } - if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 300) { - executeSaveMobilePosition(); + if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) ) { + List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); + channels.forEach(channel -> { + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", channel.getDeviceId()); + jsonObject.put("code", channel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + }); + }else { + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", mobilePosition.getDeviceId()); + jsonObject.put("code", mobilePosition.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - -// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); -// -// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); -// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); -// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); -// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - -// deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); - } - - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); } - - - } - - private void executeSave(){ - executeSaveChannel(); - executeSaveMobilePosition(); - dynamicTask.stop(talkKey); - } - - private void executeSaveChannel(){ - try { - logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", updateChannelMap.size()); - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannelGPS(deviceChannels); + taskQueue.clear(); + if(!updateChannelMap.isEmpty()) { + List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); + logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size()); + deviceChannelService.batchUpdateChannel(channels); updateChannelMap.clear(); - }catch (Exception e) { - } - } - - private void executeSaveMobilePosition(){ - if (userSetting.isSavePositionHistory()) { + if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { try { logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); deviceChannelService.batchAddMobilePosition(addMobilePositionList); - addMobilePositionList.clear(); }catch (Exception e) { logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size()); } + addMobilePositionList.clear(); } } - - - + @Scheduled(fixedRate = 10000) + public void execute(){ + logger.info("[寰呭鐞哊otify-绉诲姩浣嶇疆璁㈤槄娑堟伅鏁伴噺]: {}", taskQueue.size()); + } } -- Gitblit v1.8.0