From 9561e952a382f1214e386c9009e5f7eea9e78802 Mon Sep 17 00:00:00 2001
From: 648540858 <456PANlinlin>
Date: 星期三, 17 十一月 2021 00:07:34 +0800
Subject: [PATCH] 添加使用多线程消息处理sip消息

---
 src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java                              |   19 ----
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java |    1 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java        |    2 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java         |   95 +++++++++++++----------
 src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java                              |    1 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java        |    6 +
 src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java                     |   57 ++++++++++++++
 7 files changed, 121 insertions(+), 60 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
index 56038bd..bfe5841 100644
--- a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
+++ b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
@@ -6,6 +6,7 @@
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.web.servlet.ServletComponentScan;
 import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import springfox.documentation.oas.annotations.EnableOpenApi;
 
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java
new file mode 100644
index 0000000..f2edf04
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java
@@ -0,0 +1,57 @@
+package com.genersoft.iot.vmp.conf;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Configuration
+@EnableAsync
+public class ThreadPoolTaskConfig {
+
+    /**
+     *   榛樿鎯呭喌涓嬶紝鍦ㄥ垱寤轰簡绾跨▼姹犲悗锛岀嚎绋嬫睜涓殑绾跨▼鏁颁负0锛屽綋鏈変换鍔℃潵涔嬪悗锛屽氨浼氬垱寤轰竴涓嚎绋嬪幓鎵ц浠诲姟锛�
+     *    褰撶嚎绋嬫睜涓殑绾跨▼鏁扮洰杈惧埌corePoolSize鍚庯紝灏变細鎶婂埌杈剧殑浠诲姟鏀惧埌缂撳瓨闃熷垪褰撲腑锛�
+     *  褰撻槦鍒楁弧浜嗭紝灏辩户缁垱寤虹嚎绋嬶紝褰撶嚎绋嬫暟閲忓ぇ浜庣瓑浜巑axPoolSize鍚庯紝寮�濮嬩娇鐢ㄦ嫆缁濈瓥鐣ユ嫆缁�
+     */
+
+    /**
+     * 鏍稿績绾跨▼鏁帮紙榛樿绾跨▼鏁帮級
+     */
+    private static final int corePoolSize = 5;
+    /**
+     * 鏈�澶х嚎绋嬫暟
+     */
+    private static final int maxPoolSize = 30;
+    /**
+     * 鍏佽绾跨▼绌洪棽鏃堕棿锛堝崟浣嶏細榛樿涓虹锛�
+     */
+    private static final int keepAliveTime = 30;
+    /**
+     * 缂撳啿闃熷垪澶у皬
+     */
+    private static final int queueCapacity = 10000;
+    /**
+     * 绾跨▼姹犲悕鍓嶇紑
+     */
+    private static final String threadNamePrefix = "hdl-uhi-service-";
+
+    @Bean("taskExecutor") // bean鐨勫悕绉帮紝榛樿涓洪瀛楁瘝灏忓啓鐨勬柟娉曞悕
+    public ThreadPoolTaskExecutor taskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(corePoolSize);
+        executor.setMaxPoolSize(maxPoolSize);
+        executor.setQueueCapacity(queueCapacity);
+        executor.setKeepAliveSeconds(keepAliveTime);
+        executor.setThreadNamePrefix(threadNamePrefix);
+
+        // 绾跨▼姹犲鎷掔粷浠诲姟鐨勫鐞嗙瓥鐣�
+        // CallerRunsPolicy锛氱敱璋冪敤绾跨▼锛堟彁浜や换鍔$殑绾跨▼锛夊鐞嗚浠诲姟
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        // 鍒濆鍖�
+        executor.initialize();
+        return executor;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
index 3d9b827..420a30a 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -2,6 +2,7 @@
 
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.SipStackImpl;
@@ -28,27 +29,11 @@
 	private SipConfig sipConfig;
 
 	@Autowired
-	private SIPProcessorObserver sipProcessorObserver;
-
-	@Autowired
-	private SipSubscribe sipSubscribe;
+	private ISIPProcessorObserver sipProcessorObserver;
 
 	private SipStackImpl sipStack;
 
 	private SipFactory sipFactory;
-
-	/**   
-	 * 娑堟伅澶勭悊鍣ㄧ嚎绋嬫睜
-	 */
-	private ThreadPoolExecutor processThreadPool;
-
-	public SipLayer() {
-		int processThreadNum = Runtime.getRuntime().availableProcessors() * 10;
-		LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<>(10000);
-		processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum,
-				0L,TimeUnit.MILLISECONDS,processQueue,
-				new ThreadPoolExecutor.CallerRunsPolicy());
-	}
 
 
 	@Bean("sipFactory")
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java
new file mode 100644
index 0000000..2480f37
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java
@@ -0,0 +1,6 @@
+package com.genersoft.iot.vmp.gb28181.transmit;
+
+import javax.sip.SipListener;
+
+public interface ISIPProcessorObserver extends SipListener {
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
index cac1a01..9149be1 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -7,6 +7,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 import javax.sip.*;
@@ -22,7 +25,7 @@
  * @date:   2021骞�11鏈�5鏃� 涓嬪崍15锛�32
  */
 @Component
-public class SIPProcessorObserver implements SipListener {
+public class SIPProcessorObserver implements ISIPProcessorObserver {
 
     private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class);
 
@@ -32,6 +35,10 @@
 
     @Autowired
     private SipSubscribe sipSubscribe;
+
+    @Autowired
+    @Qualifier(value = "taskExecutor")
+    private ThreadPoolTaskExecutor poolTaskExecutor;
 
     /**
      * 娣诲姞 request璁㈤槄
@@ -65,13 +72,17 @@
      */
     @Override
     public void processRequest(RequestEvent requestEvent) {
-        String method = requestEvent.getRequest().getMethod();
-        ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
-        if (sipRequestProcessor == null) {
-            logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅equest", method);
-            return;
-        }
-        requestProcessorMap.get(method).process(requestEvent);
+
+        poolTaskExecutor.execute(() -> {
+            String method = requestEvent.getRequest().getMethod();
+            ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
+            if (sipRequestProcessor == null) {
+                logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅equest", method);
+                return;
+            }
+            requestProcessorMap.get(method).process(requestEvent);
+        });
+
     }
 
     /**
@@ -90,43 +101,45 @@
 //        }
 //        sipRequestProcessor.process(responseEvent);
 
-
-        Response response = responseEvent.getResponse();
-        logger.debug(responseEvent.getResponse().toString());
-        int status = response.getStatusCode();
-        if (((status >= 200) && (status < 300)) || status == 401) { // Success!
+        poolTaskExecutor.execute(() -> {
+            Response response = responseEvent.getResponse();
+            logger.debug(responseEvent.getResponse().toString());
+            int status = response.getStatusCode();
+            if (((status >= 200) && (status < 300)) || status == 401) { // Success!
 //            ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
-            CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
-            String method = cseqHeader.getMethod();
-            ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
-            if (sipRequestProcessor != null) {
-                sipRequestProcessor.process(responseEvent);
-            }
-            if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
-                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
-                if (callIdHeader != null) {
-                    SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
-                    if (subscribe != null) {
-                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
-                        subscribe.response(eventResult);
+                CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
+                String method = cseqHeader.getMethod();
+                ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
+                if (sipRequestProcessor != null) {
+                    sipRequestProcessor.process(responseEvent);
+                }
+                if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
+                    CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
+                    if (callIdHeader != null) {
+                        SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
+                        if (subscribe != null) {
+                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
+                            subscribe.response(eventResult);
+                        }
+                    }
+                }
+            } else if ((status >= 100) && (status < 200)) {
+                // 澧炲姞鍏跺畠鏃犻渶鍥炲鐨勫搷搴旓紝濡�101銆�180绛�
+            } else {
+                logger.warn("鎺ユ敹鍒板け璐ョ殑response鍝嶅簲锛乻tatus锛�" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
+                if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
+                    CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
+                    if (callIdHeader != null) {
+                        SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
+                        if (subscribe != null) {
+                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
+                            subscribe.response(eventResult);
+                        }
                     }
                 }
             }
-        } else if ((status >= 100) && (status < 200)) {
-            // 澧炲姞鍏跺畠鏃犻渶鍥炲鐨勫搷搴旓紝濡�101銆�180绛�
-        } else {
-            logger.warn("鎺ユ敹鍒板け璐ョ殑response鍝嶅簲锛乻tatus锛�" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
-            if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
-                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
-                if (callIdHeader != null) {
-                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
-                    if (subscribe != null) {
-                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
-                        subscribe.response(eventResult);
-                    }
-                }
-            }
-        }
+        });
+
 
     }
 
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
index 4eac134..bb62902 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -204,7 +204,6 @@
 
 		// Event
 		EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
-		eventHeader.setEventType("Catalog");
 		request.addHeader(eventHeader);
 
 		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 1af2cdf..61647aa 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1496,7 +1496,7 @@
 			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
 					: udpSipProvider.getNewCallId();
 
-			Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "presence" , callIdHeader);
+			Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader);
 			transmitRequest(device, request, errorEvent, okEvent);
 
 			return true;

--
Gitblit v1.8.0