648540858
2022-11-29 ef6693aabbbf12e83d09ad8749f6e60faacc012d
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -9,7 +9,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceAlarmService;
@@ -27,17 +26,15 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
 * 报警事件的处理,参考:9.4
@@ -72,8 +69,6 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -89,20 +84,20 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
        logger.info("[收到报警通知]设备:{}", device.getDeviceId());
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        // 回复200 OK
        try {
            responseAck((SIPRequest) evt.getRequest(), Response.OK);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 报警通知回复: {}", e.getMessage());
        }
        if (isEmpty) {
            taskExecutor.execute(() -> {
                logger.info("[处理报警通知]待处理数量:{}", taskQueue.size() );
                while (!taskQueue.isEmpty()) {
                    SipMsgInfo sipMsgInfo = taskQueue.poll();
                    // 回复200 OK
                    try {
                        responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[处理报警通知], 回复200OK失败", e);
                    }
                        SipMsgInfo sipMsgInfo = taskQueue.poll();
                    Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
                    String channelId = deviceIdElement.getText().toString();
@@ -205,12 +200,12 @@
                    if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
                        publisher.deviceAlarmEventPublish(deviceAlarm);
                    }
                    }catch (Exception e) {
                        logger.warn("[收到报警通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }
    @Override