From 80bfd9ce026e67dd2eb86c519b5b1b70e3cc7d12 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 17 十一月 2021 17:03:48 +0800 Subject: [PATCH] 使用@Async多线程处理sip消息 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 5 +- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java | 100 ++++++++++++++++++++++---------------------------- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java | 3 - src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java | 5 ++ 4 files changed, 51 insertions(+), 62 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 9149be1..7329e9f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -36,9 +36,9 @@ @Autowired private SipSubscribe sipSubscribe; - @Autowired - @Qualifier(value = "taskExecutor") - private ThreadPoolTaskExecutor poolTaskExecutor; +// @Autowired +// @Qualifier(value = "taskExecutor") +// private ThreadPoolTaskExecutor poolTaskExecutor; /** * 娣诲姞 request璁㈤槄 @@ -71,17 +71,15 @@ * @param requestEvent RequestEvent浜嬩欢 */ @Override + @Async public void processRequest(RequestEvent requestEvent) { - - poolTaskExecutor.execute(() -> { - String method = requestEvent.getRequest().getMethod(); - ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); - if (sipRequestProcessor == null) { - logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅equest", method); - return; - } - requestProcessorMap.get(method).process(requestEvent); - }); + String method = requestEvent.getRequest().getMethod(); + ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); + if (sipRequestProcessor == null) { + logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅equest", method); + return; + } + requestProcessorMap.get(method).process(requestEvent); } @@ -90,55 +88,45 @@ * @param responseEvent responseEvent浜嬩欢 */ @Override + @Async public void processResponse(ResponseEvent responseEvent) { logger.debug(responseEvent.getResponse().toString()); -// CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); -// String method = cseqHeader.getMethod(); -// ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); -// if (sipRequestProcessor == null) { -// logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅esponse", method); -// return; -// } -// sipRequestProcessor.process(responseEvent); - - poolTaskExecutor.execute(() -> { - Response response = responseEvent.getResponse(); - logger.debug(responseEvent.getResponse().toString()); - int status = response.getStatusCode(); - if (((status >= 200) && (status < 300)) || status == 401) { // Success! + Response response = responseEvent.getResponse(); + logger.debug(responseEvent.getResponse().toString()); + int status = response.getStatusCode(); + if (((status >= 200) && (status < 300)) || status == 401) { // Success! // ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt); - CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); - String method = cseqHeader.getMethod(); - ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); - if (sipRequestProcessor != null) { - sipRequestProcessor.process(responseEvent); - } - if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { - CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); - if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); - if (subscribe != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - subscribe.response(eventResult); - } - } - } - } else if ((status >= 100) && (status < 200)) { - // 澧炲姞鍏跺畠鏃犻渶鍥炲鐨勫搷搴旓紝濡�101銆�180绛� - } else { - logger.warn("鎺ユ敹鍒板け璐ョ殑response鍝嶅簲锛乻tatus锛�" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); - if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { - CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); - if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); - if (subscribe != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - subscribe.response(eventResult); - } + CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); + String method = cseqHeader.getMethod(); + ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); + if (sipRequestProcessor != null) { + sipRequestProcessor.process(responseEvent); + } + if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { + CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); + if (subscribe != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + subscribe.response(eventResult); } } } - }); + } else if ((status >= 100) && (status < 200)) { + // 澧炲姞鍏跺畠鏃犻渶鍥炲鐨勫搷搴旓紝濡�101銆�180绛� + } else { + logger.warn("鎺ユ敹鍒板け璐ョ殑response鍝嶅簲锛乻tatus锛�" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); + if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { + CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); + if (subscribe != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + subscribe.response(eventResult); + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index c97b55a..64812b6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -36,7 +36,8 @@ @Component public class ByeRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { - private Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class); + private final Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class); + private final String method = "BYE"; @Autowired private ISIPCommander cmder; @@ -52,8 +53,6 @@ @Autowired private IMediaServerService mediaServerService; - - private String method = "BYE"; @Autowired private SIPProcessorObserver sipProcessorObserver; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java index efc8259..82d790f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java @@ -16,9 +16,6 @@ public static Map<String, IMessageHandler> messageHandlerMap = new ConcurrentHashMap<>(); - @Autowired - public MessageRequestProcessor messageRequestProcessor; - public void addHandler(String cmdType, IMessageHandler messageHandler) { messageHandlerMap.put(cmdType, messageHandler); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java index 56c020b..6965a1c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java @@ -1,7 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @@ -9,6 +11,9 @@ private final String messageType = "Notify"; + @Autowired + private MessageRequestProcessor messageRequestProcessor; + @Override public void afterPropertiesSet() throws Exception { messageRequestProcessor.addHandler(messageType, this); -- Gitblit v1.8.0