648540858
2021-11-17 9561e952a382f1214e386c9009e5f7eea9e78802
添加使用多线程消息处理sip消息
5个文件已修改
2个文件已添加
181 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
@@ -6,6 +6,7 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.oas.annotations.EnableOpenApi;
src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java
New file
@@ -0,0 +1,57 @@
package com.genersoft.iot.vmp.conf;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
    /**
     *   默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
     *    当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
     *  当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
     */
    /**
     * 核心线程数(默认线程数)
     */
    private static final int corePoolSize = 5;
    /**
     * 最大线程数
     */
    private static final int maxPoolSize = 30;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    private static final int keepAliveTime = 30;
    /**
     * 缓冲队列大小
     */
    private static final int queueCapacity = 10000;
    /**
     * 线程池名前缀
     */
    private static final String threadNamePrefix = "hdl-uhi-service-";
    @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);
        // 线程池对拒绝任务的处理策略
        // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
@@ -28,27 +29,11 @@
    private SipConfig sipConfig;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Autowired
    private SipSubscribe sipSubscribe;
    private ISIPProcessorObserver sipProcessorObserver;
    private SipStackImpl sipStack;
    private SipFactory sipFactory;
    /**
     * 消息处理器线程池
     */
    private ThreadPoolExecutor processThreadPool;
    public SipLayer() {
        int processThreadNum = Runtime.getRuntime().availableProcessors() * 10;
        LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<>(10000);
        processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum,
                0L,TimeUnit.MILLISECONDS,processQueue,
                new ThreadPoolExecutor.CallerRunsPolicy());
    }
    @Bean("sipFactory")
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java
New file
@@ -0,0 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit;
import javax.sip.SipListener;
public interface ISIPProcessorObserver extends SipListener {
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -7,6 +7,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.sip.*;
@@ -22,7 +25,7 @@
 * @date:   2021年11月5日 下午15:32
 */
@Component
public class SIPProcessorObserver implements SipListener {
public class SIPProcessorObserver implements ISIPProcessorObserver {
    private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class);
@@ -32,6 +35,10 @@
    @Autowired
    private SipSubscribe sipSubscribe;
    @Autowired
    @Qualifier(value = "taskExecutor")
    private ThreadPoolTaskExecutor poolTaskExecutor;
    /**
     * 添加 request订阅
@@ -65,13 +72,17 @@
     */
    @Override
    public void processRequest(RequestEvent requestEvent) {
        String method = requestEvent.getRequest().getMethod();
        ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
        if (sipRequestProcessor == null) {
            logger.warn("不支持方法{}的request", method);
            return;
        }
        requestProcessorMap.get(method).process(requestEvent);
        poolTaskExecutor.execute(() -> {
            String method = requestEvent.getRequest().getMethod();
            ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
            if (sipRequestProcessor == null) {
                logger.warn("不支持方法{}的request", method);
                return;
            }
            requestProcessorMap.get(method).process(requestEvent);
        });
    }
    /**
@@ -90,43 +101,45 @@
//        }
//        sipRequestProcessor.process(responseEvent);
        Response response = responseEvent.getResponse();
        logger.debug(responseEvent.getResponse().toString());
        int status = response.getStatusCode();
        if (((status >= 200) && (status < 300)) || status == 401) { // Success!
        poolTaskExecutor.execute(() -> {
            Response response = responseEvent.getResponse();
            logger.debug(responseEvent.getResponse().toString());
            int status = response.getStatusCode();
            if (((status >= 200) && (status < 300)) || status == 401) { // Success!
//            ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
            CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
            String method = cseqHeader.getMethod();
            ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
            if (sipRequestProcessor != null) {
                sipRequestProcessor.process(responseEvent);
            }
            if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                if (callIdHeader != null) {
                    SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
                    if (subscribe != null) {
                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                        subscribe.response(eventResult);
                CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
                String method = cseqHeader.getMethod();
                ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
                if (sipRequestProcessor != null) {
                    sipRequestProcessor.process(responseEvent);
                }
                if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
                    CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                    if (callIdHeader != null) {
                        SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
                        if (subscribe != null) {
                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                            subscribe.response(eventResult);
                        }
                    }
                }
            } else if ((status >= 100) && (status < 200)) {
                // 增加其它无需回复的响应,如101、180等
            } else {
                logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
                if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
                    CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                    if (callIdHeader != null) {
                        SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
                        if (subscribe != null) {
                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                            subscribe.response(eventResult);
                        }
                    }
                }
            }
        } else if ((status >= 100) && (status < 200)) {
            // 增加其它无需回复的响应,如101、180等
        } else {
            logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
            if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                if (callIdHeader != null) {
                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
                    if (subscribe != null) {
                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
                        subscribe.response(eventResult);
                    }
                }
            }
        }
        });
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -204,7 +204,6 @@
        // Event
        EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
        eventHeader.setEventType("Catalog");
        request.addHeader(eventHeader);
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1496,7 +1496,7 @@
            CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "presence" , callIdHeader);
            Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader);
            transmitRequest(device, request, errorEvent, okEvent);
            return true;