| | |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.Device; |
| | | import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; |
| | | import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; |
| | | import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; |
| | | import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| | |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.transaction.annotation.Transactional; |
| | | import org.springframework.util.ObjectUtils; |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | |
| | | /** |
| | | * SIP命令类型: NOTIFY请求中的移动位置请求处理 |
| | |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); |
| | | |
| | | private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | |
| | | @Autowired |
| | | private IDeviceChannelService deviceChannelService; |
| | | |
| | | public void process(RequestEvent evt) { |
| | | |
| | | if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { |
| | | logger.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); |
| | | return; |
| | | } |
| | | taskQueue.offer(new HandlerCatchData(evt, null, null)); |
| | | } |
| | | |
| | | @Scheduled(fixedRate = 200) //每200毫秒执行一次 |
| | | @Transactional |
| | | public void process(List<RequestEvent> eventList) { |
| | | if (eventList.isEmpty()) { |
| | | public void executeTaskQueue() { |
| | | if (taskQueue.isEmpty()) { |
| | | return; |
| | | } |
| | | Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); |
| | | List<MobilePosition> addMobilePositionList = new ArrayList<>(); |
| | | for (RequestEvent evt : eventList) { |
| | | for (HandlerCatchData take : taskQueue) { |
| | | if (take == null) { |
| | | continue; |
| | | } |
| | | RequestEvent evt = take.getEvt(); |
| | | try { |
| | | FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); |
| | | String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); |
| | |
| | | logger.error("未处理的异常 ", e); |
| | | } |
| | | } |
| | | taskQueue.clear(); |
| | | if(!updateChannelMap.isEmpty()) { |
| | | List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); |
| | | logger.info("[移动位置订阅]更新通道位置: {}", channels.size()); |
| | | deviceChannelService.batchUpdateChannelGPS(channels); |
| | | deviceChannelService.batchUpdateChannel(channels); |
| | | updateChannelMap.clear(); |
| | | } |
| | | if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { |
| | |
| | | addMobilePositionList.clear(); |
| | | } |
| | | } |
| | | @Scheduled(fixedRate = 10000) |
| | | public void execute(){ |
| | | logger.info("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size()); |
| | | } |
| | | } |