648540858
2023-03-22 c73fe2b0cf2b0956b0f480aed2e5cb35ba446de5
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -1,21 +1,22 @@
package com.genersoft.iot.vmp.gb28181.transmit;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.header.*;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,7 +30,7 @@
    private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class);
    private static Map<String, ISIPRequestProcessor> requestProcessorMap = new ConcurrentHashMap<>();
    private static Map<String,  ISIPRequestProcessor> requestProcessorMap = new ConcurrentHashMap<>();
    private static Map<String, ISIPResponseProcessor> responseProcessorMap = new ConcurrentHashMap<>();
    private static ITimeoutProcessor timeoutProcessor;
@@ -37,8 +38,7 @@
    private SipSubscribe sipSubscribe;
    @Autowired
    @Qualifier(value = "taskExecutor")
    private ThreadPoolTaskExecutor poolTaskExecutor;
    private EventPublisher eventPublisher;
    /**
     * 添加 request订阅
@@ -63,7 +63,7 @@
     * @param processor 处理程序
     */
    public void addTimeoutProcessor(ITimeoutProcessor processor) {
        this.timeoutProcessor = processor;
        timeoutProcessor = processor;
    }
    /**
@@ -71,17 +71,16 @@
     * @param requestEvent RequestEvent事件
     */
    @Override
    @Async("taskExecutor")
    public void processRequest(RequestEvent requestEvent) {
        poolTaskExecutor.execute(() -> {
            String method = requestEvent.getRequest().getMethod();
            ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
            if (sipRequestProcessor == null) {
                logger.warn("不支持方法{}的request", method);
                return;
            }
            requestProcessorMap.get(method).process(requestEvent);
        });
        String method = requestEvent.getRequest().getMethod();
        ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
        if (sipRequestProcessor == null) {
            logger.warn("不支持方法{}的request", method);
            // TODO 回复错误玛
            return;
        }
        requestProcessorMap.get(method).process(requestEvent);
    }
@@ -90,55 +89,49 @@
     * @param responseEvent responseEvent事件
     */
    @Override
    @Async("taskExecutor")
    public void processResponse(ResponseEvent responseEvent) {
        logger.debug(responseEvent.getResponse().toString());
//        CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
//        String method = cseqHeader.getMethod();
//        ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
//        if (sipRequestProcessor == null) {
//            logger.warn("不支持方法{}的response", method);
//            return;
//        }
//        sipRequestProcessor.process(responseEvent);
        Response response = responseEvent.getResponse();
        int status = response.getStatusCode();
        poolTaskExecutor.execute(() -> {
            Response response = responseEvent.getResponse();
            logger.debug(responseEvent.getResponse().toString());
            int status = response.getStatusCode();
            if (((status >= 200) && (status < 300)) || status == 401) { // Success!
//            ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
                CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
                String method = cseqHeader.getMethod();
                ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
                if (sipRequestProcessor != null) {
                    sipRequestProcessor.process(responseEvent);
                }
                if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
                    CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                    if (callIdHeader != null) {
                        SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
                        if (subscribe != null) {
                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                            subscribe.response(eventResult);
                        }
                    }
                }
            } else if ((status >= 100) && (status < 200)) {
                // 增加其它无需回复的响应,如101、180等
            } else {
                logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
                if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
                    CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                    if (callIdHeader != null) {
                        SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
                        if (subscribe != null) {
                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                            subscribe.response(eventResult);
                        }
        // Success
        if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
            CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
            String method = cseqHeader.getMethod();
            ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
            if (sipRequestProcessor != null) {
                sipRequestProcessor.process(responseEvent);
            }
            if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                if (callIdHeader != null) {
                    SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
                    if (subscribe != null) {
                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                        sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
                        subscribe.response(eventResult);
                    }
                }
            }
        });
        } else if ((status >= Response.TRYING) && (status < Response.OK)) {
            // 增加其它无需回复的响应,如101、180等
        } else {
            logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase());
            if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                if (callIdHeader != null) {
                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
                    if (subscribe != null) {
                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                        subscribe.response(eventResult);
                        sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
                    }
                }
            }
            if (responseEvent.getDialog() != null) {
                responseEvent.getDialog().delete();
            }
        }
    }
@@ -149,17 +142,58 @@
     */
    @Override
    public void processTimeout(TimeoutEvent timeoutEvent) {
        if(timeoutProcessor != null) {
            timeoutProcessor.process(timeoutEvent);
        logger.info("[消息发送超时]");
        ClientTransaction clientTransaction = timeoutEvent.getClientTransaction();
        if (clientTransaction != null) {
            logger.info("[发送错误订阅] clientTransaction != null");
            Request request = clientTransaction.getRequest();
            if (request != null) {
                logger.info("[发送错误订阅] request != null");
                CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
                if (callIdHeader != null) {
                    logger.info("[发送错误订阅]");
                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
                    SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent);
                    if (subscribe != null){
                        subscribe.response(eventResult);
                    }
                    sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
                    sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
                }
            }
        }
        eventPublisher.requestTimeOut(timeoutEvent);
    }
    @Override
    public void processIOException(IOExceptionEvent exceptionEvent) {
        System.out.println("processIOException");
    }
    @Override
    public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
//        if (transactionTerminatedEvent.isServerTransaction()) {
//            ServerTransaction serverTransaction = transactionTerminatedEvent.getServerTransaction();
//            serverTransaction.get
//        }
//        Transaction transaction = null;
//        System.out.println("processTransactionTerminated");
//        if (transactionTerminatedEvent.isServerTransaction()) {
//            transaction = transactionTerminatedEvent.getServerTransaction();
//        }else {
//            transaction = transactionTerminatedEvent.getClientTransaction();
//        }
//
//        System.out.println(transaction.getBranchId());
//        System.out.println(transaction.getState());
//        System.out.println(transaction.getRequest().getMethod());
//        CallIdHeader header = (CallIdHeader)transaction.getRequest().getHeader(CallIdHeader.NAME);
//        SipSubscribe.EventResult<TransactionTerminatedEvent> terminatedEventEventResult = new SipSubscribe.EventResult<>(transactionTerminatedEvent);
//        sipSubscribe.getErrorSubscribe(header.getCallId()).response(terminatedEventEventResult);
    }
    @Override