From 72b437cc52e2907c9a66d6bda60acbffc736cdb8 Mon Sep 17 00:00:00 2001 From: BradyXu <brady_xu@outlook.com> Date: 星期日, 12 五月 2024 02:15:41 +0800 Subject: [PATCH] 修复当上级平台点播时,上级平台和下级平台局域网不通时,导致推流失败。推流目标IP地址改为配置的SDP发流IP,如果SDP发流IP存在的话。否则还是从连接中获取推流目标IP --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 33 ++++++++++++++++++++++++++++----- 1 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 05a5336..a8c4af8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -17,6 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -27,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰涓殑绉诲姩浣嶇疆璇锋眰澶勭悊 @@ -37,6 +40,7 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private UserSetting userSetting; @@ -50,14 +54,28 @@ @Autowired private IDeviceChannelService deviceChannelService; + public void process(RequestEvent evt) { + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-绉诲姩浣嶇疆] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } + + @Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆� @Transactional - public void process(List<RequestEvent> eventList) { - if (eventList.isEmpty()) { + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { return; } Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); List<MobilePosition> addMobilePositionList = new ArrayList<>(); - for (RequestEvent evt : eventList) { + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; + } + RequestEvent evt = take.getEvt(); try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); @@ -148,7 +166,7 @@ }catch (Exception e) { logger.error("[鍚戜笂绾ц浆鍙戠Щ鍔ㄤ綅缃け璐 ", e); } - if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { + if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId())) { List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); channels.forEach(channel -> { // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� @@ -180,10 +198,11 @@ logger.error("鏈鐞嗙殑寮傚父 ", e); } } + taskQueue.clear(); if(!updateChannelMap.isEmpty()) { List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size()); - deviceChannelService.batchUpdateChannelGPS(channels); + deviceChannelService.batchUpdateChannel(channels); updateChannelMap.clear(); } if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { @@ -196,4 +215,8 @@ addMobilePositionList.clear(); } } + @Scheduled(fixedRate = 10000) + public void execute(){ + logger.info("[寰呭鐞哊otify-绉诲姩浣嶇疆璁㈤槄娑堟伅鏁伴噺]: {}", taskQueue.size()); + } } -- Gitblit v1.8.0