xiaoxie
2022-05-23 d739bfa5976e36ced26f906ab16f83c20c8cb27c
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -1,10 +1,12 @@
package com.genersoft.iot.vmp.gb28181.transmit;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.RegisterRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd.KeepaliveNotifyMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -12,11 +14,13 @@
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.header.Header;
import javax.sip.address.SipURI;
import javax.sip.address.URI;
import javax.sip.header.*;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -36,9 +40,11 @@
    @Autowired
    private SipSubscribe sipSubscribe;
//    @Autowired
//    @Qualifier(value = "taskExecutor")
//    private ThreadPoolTaskExecutor poolTaskExecutor;
    @Autowired
    private EventPublisher eventPublisher;
    /**
     * 添加 request订阅
@@ -63,7 +69,7 @@
     * @param processor 处理程序
     */
    public void addTimeoutProcessor(ITimeoutProcessor processor) {
        this.timeoutProcessor = processor;
        timeoutProcessor = processor;
    }
    /**
@@ -73,6 +79,7 @@
    @Override
    @Async
    public void processRequest(RequestEvent requestEvent) {
        logger.debug("\n收到请求:\n{}", requestEvent.getRequest());
        String method = requestEvent.getRequest().getMethod();
        ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
        if (sipRequestProcessor == null) {
@@ -90,25 +97,25 @@
    @Override
    @Async
    public void processResponse(ResponseEvent responseEvent) {
        logger.debug(responseEvent.getResponse().toString());
        Response response = responseEvent.getResponse();
        logger.debug(responseEvent.getResponse().toString());
        logger.debug("\n收到响应:\n{}", responseEvent.getResponse());
        int status = response.getStatusCode();
        if (((status >= 200) && (status < 300)) || status == 401) { // Success!
        if (((status >= 200) && (status < 300)) || status == Response.UNAUTHORIZED) { // Success!
            CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
            String method = cseqHeader.getMethod();
            ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
            if (sipRequestProcessor != null) {
                sipRequestProcessor.process(responseEvent);
            }
            if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
            if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                if (callIdHeader != null) {
                    SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
                    if (subscribe != null) {
                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                        subscribe.response(eventResult);
                        sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
                        subscribe.response(eventResult);
                    }
                }
            }
@@ -141,9 +148,32 @@
     */
    @Override
    public void processTimeout(TimeoutEvent timeoutEvent) {
        if(timeoutProcessor != null) {
            timeoutProcessor.process(timeoutEvent);
        logger.info("[消息发送超时]");
        ClientTransaction clientTransaction = timeoutEvent.getClientTransaction();
        eventPublisher.requestTimeOut(timeoutEvent);
        if (clientTransaction != null) {
            Request request = clientTransaction.getRequest();
            if (request != null) {
                CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
                if (callIdHeader != null) {
                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
                    SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent);
                    subscribe.response(eventResult);
                    sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
                }
            }
        }
//        Timeout timeout = timeoutEvent.getTimeout();
//        ServerTransaction serverTransaction = timeoutEvent.getServerTransaction();
//        if (serverTransaction != null) {
//            Request request = serverTransaction.getRequest();
//            URI requestURI = request.getRequestURI();
//            Header header = request.getHeader(FromHeader.NAME);
//        }
//        if(timeoutProcessor != null) {
//            timeoutProcessor.process(timeoutEvent);
//        }
    }
    @Override