From 9561e952a382f1214e386c9009e5f7eea9e78802 Mon Sep 17 00:00:00 2001
From: 648540858 <456PANlinlin>
Date: 星期三, 17 十一月 2021 00:07:34 +0800
Subject: [PATCH] 添加使用多线程消息处理sip消息
---
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java | 19 ----
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java | 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java | 95 +++++++++++++----------
src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java | 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java | 6 +
src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java | 57 ++++++++++++++
7 files changed, 121 insertions(+), 60 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
index 56038bd..bfe5841 100644
--- a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
+++ b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
@@ -6,6 +6,7 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.oas.annotations.EnableOpenApi;
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java
new file mode 100644
index 0000000..f2edf04
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java
@@ -0,0 +1,57 @@
+package com.genersoft.iot.vmp.conf;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Configuration
+@EnableAsync
+public class ThreadPoolTaskConfig {
+
+ /**
+ * 榛樿鎯呭喌涓嬶紝鍦ㄥ垱寤轰簡绾跨▼姹犲悗锛岀嚎绋嬫睜涓殑绾跨▼鏁颁负0锛屽綋鏈変换鍔℃潵涔嬪悗锛屽氨浼氬垱寤轰竴涓嚎绋嬪幓鎵ц浠诲姟锛�
+ * 褰撶嚎绋嬫睜涓殑绾跨▼鏁扮洰杈惧埌corePoolSize鍚庯紝灏变細鎶婂埌杈剧殑浠诲姟鏀惧埌缂撳瓨闃熷垪褰撲腑锛�
+ * 褰撻槦鍒楁弧浜嗭紝灏辩户缁垱寤虹嚎绋嬶紝褰撶嚎绋嬫暟閲忓ぇ浜庣瓑浜巑axPoolSize鍚庯紝寮�濮嬩娇鐢ㄦ嫆缁濈瓥鐣ユ嫆缁�
+ */
+
+ /**
+ * 鏍稿績绾跨▼鏁帮紙榛樿绾跨▼鏁帮級
+ */
+ private static final int corePoolSize = 5;
+ /**
+ * 鏈�澶х嚎绋嬫暟
+ */
+ private static final int maxPoolSize = 30;
+ /**
+ * 鍏佽绾跨▼绌洪棽鏃堕棿锛堝崟浣嶏細榛樿涓虹锛�
+ */
+ private static final int keepAliveTime = 30;
+ /**
+ * 缂撳啿闃熷垪澶у皬
+ */
+ private static final int queueCapacity = 10000;
+ /**
+ * 绾跨▼姹犲悕鍓嶇紑
+ */
+ private static final String threadNamePrefix = "hdl-uhi-service-";
+
+ @Bean("taskExecutor") // bean鐨勫悕绉帮紝榛樿涓洪瀛楁瘝灏忓啓鐨勬柟娉曞悕
+ public ThreadPoolTaskExecutor taskExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(corePoolSize);
+ executor.setMaxPoolSize(maxPoolSize);
+ executor.setQueueCapacity(queueCapacity);
+ executor.setKeepAliveSeconds(keepAliveTime);
+ executor.setThreadNamePrefix(threadNamePrefix);
+
+ // 绾跨▼姹犲鎷掔粷浠诲姟鐨勫鐞嗙瓥鐣�
+ // CallerRunsPolicy锛氱敱璋冪敤绾跨▼锛堟彁浜や换鍔$殑绾跨▼锛夊鐞嗚浠诲姟
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+ // 鍒濆鍖�
+ executor.initialize();
+ return executor;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
index 3d9b827..420a30a 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
@@ -28,27 +29,11 @@
private SipConfig sipConfig;
@Autowired
- private SIPProcessorObserver sipProcessorObserver;
-
- @Autowired
- private SipSubscribe sipSubscribe;
+ private ISIPProcessorObserver sipProcessorObserver;
private SipStackImpl sipStack;
private SipFactory sipFactory;
-
- /**
- * 娑堟伅澶勭悊鍣ㄧ嚎绋嬫睜
- */
- private ThreadPoolExecutor processThreadPool;
-
- public SipLayer() {
- int processThreadNum = Runtime.getRuntime().availableProcessors() * 10;
- LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<>(10000);
- processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum,
- 0L,TimeUnit.MILLISECONDS,processQueue,
- new ThreadPoolExecutor.CallerRunsPolicy());
- }
@Bean("sipFactory")
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java
new file mode 100644
index 0000000..2480f37
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java
@@ -0,0 +1,6 @@
+package com.genersoft.iot.vmp.gb28181.transmit;
+
+import javax.sip.SipListener;
+
+public interface ISIPProcessorObserver extends SipListener {
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
index cac1a01..9149be1 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -7,6 +7,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.sip.*;
@@ -22,7 +25,7 @@
* @date: 2021骞�11鏈�5鏃� 涓嬪崍15锛�32
*/
@Component
-public class SIPProcessorObserver implements SipListener {
+public class SIPProcessorObserver implements ISIPProcessorObserver {
private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class);
@@ -32,6 +35,10 @@
@Autowired
private SipSubscribe sipSubscribe;
+
+ @Autowired
+ @Qualifier(value = "taskExecutor")
+ private ThreadPoolTaskExecutor poolTaskExecutor;
/**
* 娣诲姞 request璁㈤槄
@@ -65,13 +72,17 @@
*/
@Override
public void processRequest(RequestEvent requestEvent) {
- String method = requestEvent.getRequest().getMethod();
- ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
- if (sipRequestProcessor == null) {
- logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅equest", method);
- return;
- }
- requestProcessorMap.get(method).process(requestEvent);
+
+ poolTaskExecutor.execute(() -> {
+ String method = requestEvent.getRequest().getMethod();
+ ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
+ if (sipRequestProcessor == null) {
+ logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅equest", method);
+ return;
+ }
+ requestProcessorMap.get(method).process(requestEvent);
+ });
+
}
/**
@@ -90,43 +101,45 @@
// }
// sipRequestProcessor.process(responseEvent);
-
- Response response = responseEvent.getResponse();
- logger.debug(responseEvent.getResponse().toString());
- int status = response.getStatusCode();
- if (((status >= 200) && (status < 300)) || status == 401) { // Success!
+ poolTaskExecutor.execute(() -> {
+ Response response = responseEvent.getResponse();
+ logger.debug(responseEvent.getResponse().toString());
+ int status = response.getStatusCode();
+ if (((status >= 200) && (status < 300)) || status == 401) { // Success!
// ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
- CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
- String method = cseqHeader.getMethod();
- ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
- if (sipRequestProcessor != null) {
- sipRequestProcessor.process(responseEvent);
- }
- if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
- CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
- if (callIdHeader != null) {
- SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
- if (subscribe != null) {
- SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
- subscribe.response(eventResult);
+ CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
+ String method = cseqHeader.getMethod();
+ ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
+ if (sipRequestProcessor != null) {
+ sipRequestProcessor.process(responseEvent);
+ }
+ if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
+ CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
+ if (callIdHeader != null) {
+ SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
+ if (subscribe != null) {
+ SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
+ subscribe.response(eventResult);
+ }
+ }
+ }
+ } else if ((status >= 100) && (status < 200)) {
+ // 澧炲姞鍏跺畠鏃犻渶鍥炲鐨勫搷搴旓紝濡�101銆�180绛�
+ } else {
+ logger.warn("鎺ユ敹鍒板け璐ョ殑response鍝嶅簲锛乻tatus锛�" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
+ if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
+ CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
+ if (callIdHeader != null) {
+ SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
+ if (subscribe != null) {
+ SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
+ subscribe.response(eventResult);
+ }
}
}
}
- } else if ((status >= 100) && (status < 200)) {
- // 澧炲姞鍏跺畠鏃犻渶鍥炲鐨勫搷搴旓紝濡�101銆�180绛�
- } else {
- logger.warn("鎺ユ敹鍒板け璐ョ殑response鍝嶅簲锛乻tatus锛�" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
- if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
- CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
- if (callIdHeader != null) {
- SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
- if (subscribe != null) {
- SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
- subscribe.response(eventResult);
- }
- }
- }
- }
+ });
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
index 4eac134..bb62902 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -204,7 +204,6 @@
// Event
EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
- eventHeader.setEventType("Catalog");
request.addHeader(eventHeader);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 1af2cdf..61647aa 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1496,7 +1496,7 @@
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
- Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "presence" , callIdHeader);
+ Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader);
transmitRequest(device, request, errorEvent, okEvent);
return true;
--
Gitblit v1.8.0