| | |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.Device; |
| | | import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; |
| | | import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; |
| | | import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| | | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| | |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.transaction.annotation.Transactional; |
| | | |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | |
| | | /** |
| | |
| | | |
| | | private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); |
| | | private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>(); |
| | | |
| | | private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | |
| | | private SipConfig sipConfig; |
| | | |
| | | @Transactional |
| | | public void process(List<RequestEvent> evtList) { |
| | | if (evtList.isEmpty()) { |
| | | public void process(RequestEvent evt) { |
| | | if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { |
| | | logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); |
| | | return; |
| | | } |
| | | for (RequestEvent evt : evtList) { |
| | | taskQueue.offer(new HandlerCatchData(evt, null, null)); |
| | | } |
| | | |
| | | @Scheduled(fixedRate = 400) //每400毫秒执行一次 |
| | | public void executeTaskQueue(){ |
| | | if (taskQueue.isEmpty()) { |
| | | return; |
| | | } |
| | | for (HandlerCatchData take : taskQueue) { |
| | | if (take == null) { |
| | | continue; |
| | | } |
| | | RequestEvent evt = take.getEvt(); |
| | | try { |
| | | long start = System.currentTimeMillis(); |
| | | FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); |
| | |
| | | logger.error("未处理的异常 ", e); |
| | | } |
| | | } |
| | | taskQueue.clear(); |
| | | if (!updateChannelMap.keySet().isEmpty() |
| | | || !addChannelMap.keySet().isEmpty() |
| | | || !updateChannelOnlineList.isEmpty() |
| | |
| | | updateChannelOfflineList.clear(); |
| | | } |
| | | } |
| | | |
| | | @Scheduled(fixedRate = 10000) //每1秒执行一次 |
| | | public void execute(){ |
| | | logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size()); |
| | | } |
| | | } |