src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
@@ -6,6 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import springfox.documentation.oas.annotations.EnableOpenApi; src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java
New file @@ -0,0 +1,57 @@ package com.genersoft.iot.vmp.conf; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ThreadPoolTaskConfig { /** * 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务, * 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; * 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝 */ /** * 核心线程数(默认线程数) */ private static final int corePoolSize = 5; /** * 最大线程数 */ private static final int maxPoolSize = 30; /** * 允许线程空闲时间(单位:默认为秒) */ private static final int keepAliveTime = 30; /** * 缓冲队列大小 */ private static final int queueCapacity = 10000; /** * 线程池名前缀 */ private static final String threadNamePrefix = "hdl-uhi-service-"; @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名 public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveTime); executor.setThreadNamePrefix(threadNamePrefix); // 线程池对拒绝任务的处理策略 // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化 executor.initialize(); return executor; } } src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; @@ -28,27 +29,11 @@ private SipConfig sipConfig; @Autowired private SIPProcessorObserver sipProcessorObserver; @Autowired private SipSubscribe sipSubscribe; private ISIPProcessorObserver sipProcessorObserver; private SipStackImpl sipStack; private SipFactory sipFactory; /** * 消息处理器线程池 */ private ThreadPoolExecutor processThreadPool; public SipLayer() { int processThreadNum = Runtime.getRuntime().availableProcessors() * 10; LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<>(10000); processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum, 0L,TimeUnit.MILLISECONDS,processQueue, new ThreadPoolExecutor.CallerRunsPolicy()); } @Bean("sipFactory") src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java
New file @@ -0,0 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit; import javax.sip.SipListener; public interface ISIPProcessorObserver extends SipListener { } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -7,6 +7,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.sip.*; @@ -22,7 +25,7 @@ * @date: 2021年11月5日 下午15:32 */ @Component public class SIPProcessorObserver implements SipListener { public class SIPProcessorObserver implements ISIPProcessorObserver { private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class); @@ -32,6 +35,10 @@ @Autowired private SipSubscribe sipSubscribe; @Autowired @Qualifier(value = "taskExecutor") private ThreadPoolTaskExecutor poolTaskExecutor; /** * 添加 request订阅 @@ -65,13 +72,17 @@ */ @Override public void processRequest(RequestEvent requestEvent) { String method = requestEvent.getRequest().getMethod(); ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); if (sipRequestProcessor == null) { logger.warn("不支持方法{}的request", method); return; } requestProcessorMap.get(method).process(requestEvent); poolTaskExecutor.execute(() -> { String method = requestEvent.getRequest().getMethod(); ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); if (sipRequestProcessor == null) { logger.warn("不支持方法{}的request", method); return; } requestProcessorMap.get(method).process(requestEvent); }); } /** @@ -90,43 +101,45 @@ // } // sipRequestProcessor.process(responseEvent); Response response = responseEvent.getResponse(); logger.debug(responseEvent.getResponse().toString()); int status = response.getStatusCode(); if (((status >= 200) && (status < 300)) || status == 401) { // Success! poolTaskExecutor.execute(() -> { Response response = responseEvent.getResponse(); logger.debug(responseEvent.getResponse().toString()); int status = response.getStatusCode(); if (((status >= 200) && (status < 300)) || status == 401) { // Success! // ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt); CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); String method = cseqHeader.getMethod(); ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); if (sipRequestProcessor != null) { sipRequestProcessor.process(responseEvent); } if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); if (subscribe != null) { SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); subscribe.response(eventResult); CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); String method = cseqHeader.getMethod(); ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); if (sipRequestProcessor != null) { sipRequestProcessor.process(responseEvent); } if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); if (subscribe != null) { SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); subscribe.response(eventResult); } } } } else if ((status >= 100) && (status < 200)) { // 增加其它无需回复的响应,如101、180等 } else { logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); if (subscribe != null) { SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); subscribe.response(eventResult); } } } } } else if ((status >= 100) && (status < 200)) { // 增加其它无需回复的响应,如101、180等 } else { logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); if (subscribe != null) { SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); subscribe.response(eventResult); } } } } }); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -204,7 +204,6 @@ // Event EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event); eventHeader.setEventType("Catalog"); request.addHeader(eventHeader); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml"); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1496,7 +1496,7 @@ CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "presence" , callIdHeader); Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader); transmitRequest(device, request, errorEvent, okEvent); return true;