From c084d6c98af1ef4d36a61adc719df5db76589428 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期六, 07 十月 2023 18:00:08 +0800 Subject: [PATCH] 优化国标级联心跳失败判断逻辑 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java | 162 ++++++++++++++++++++++++++++++++--------------------- 1 files changed, 98 insertions(+), 64 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 9149be1..52356c1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -1,21 +1,22 @@ package com.genersoft.iot.vmp.gb28181.transmit; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor; +import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.sip.*; -import javax.sip.header.CSeqHeader; -import javax.sip.header.CallIdHeader; +import javax.sip.header.*; +import javax.sip.message.Request; import javax.sip.message.Response; +import java.net.InetAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -29,7 +30,7 @@ private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class); - private static Map<String, ISIPRequestProcessor> requestProcessorMap = new ConcurrentHashMap<>(); + private static Map<String, ISIPRequestProcessor> requestProcessorMap = new ConcurrentHashMap<>(); private static Map<String, ISIPResponseProcessor> responseProcessorMap = new ConcurrentHashMap<>(); private static ITimeoutProcessor timeoutProcessor; @@ -37,8 +38,7 @@ private SipSubscribe sipSubscribe; @Autowired - @Qualifier(value = "taskExecutor") - private ThreadPoolTaskExecutor poolTaskExecutor; + private EventPublisher eventPublisher; /** * 娣诲姞 request璁㈤槄 @@ -63,7 +63,7 @@ * @param processor 澶勭悊绋嬪簭 */ public void addTimeoutProcessor(ITimeoutProcessor processor) { - this.timeoutProcessor = processor; + timeoutProcessor = processor; } /** @@ -71,17 +71,16 @@ * @param requestEvent RequestEvent浜嬩欢 */ @Override + @Async("taskExecutor") public void processRequest(RequestEvent requestEvent) { - - poolTaskExecutor.execute(() -> { - String method = requestEvent.getRequest().getMethod(); - ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); - if (sipRequestProcessor == null) { - logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅equest", method); - return; - } - requestProcessorMap.get(method).process(requestEvent); - }); + String method = requestEvent.getRequest().getMethod(); + ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); + if (sipRequestProcessor == null) { + logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅equest", method); + // TODO 鍥炲閿欒鐜� + return; + } + requestProcessorMap.get(method).process(requestEvent); } @@ -90,55 +89,49 @@ * @param responseEvent responseEvent浜嬩欢 */ @Override + @Async("taskExecutor") public void processResponse(ResponseEvent responseEvent) { - logger.debug(responseEvent.getResponse().toString()); -// CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); -// String method = cseqHeader.getMethod(); -// ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); -// if (sipRequestProcessor == null) { -// logger.warn("涓嶆敮鎸佹柟娉晎}鐨剅esponse", method); -// return; -// } -// sipRequestProcessor.process(responseEvent); + Response response = responseEvent.getResponse(); + int status = response.getStatusCode(); - poolTaskExecutor.execute(() -> { - Response response = responseEvent.getResponse(); - logger.debug(responseEvent.getResponse().toString()); - int status = response.getStatusCode(); - if (((status >= 200) && (status < 300)) || status == 401) { // Success! -// ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt); - CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); - String method = cseqHeader.getMethod(); - ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); - if (sipRequestProcessor != null) { - sipRequestProcessor.process(responseEvent); - } - if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { - CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); - if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); - if (subscribe != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - subscribe.response(eventResult); - } - } - } - } else if ((status >= 100) && (status < 200)) { - // 澧炲姞鍏跺畠鏃犻渶鍥炲鐨勫搷搴旓紝濡�101銆�180绛� - } else { - logger.warn("鎺ユ敹鍒板け璐ョ殑response鍝嶅簲锛乻tatus锛�" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); - if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { - CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); - if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); - if (subscribe != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - subscribe.response(eventResult); - } + // Success + if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { + CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); + String method = cseqHeader.getMethod(); + ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); + if (sipRequestProcessor != null) { + sipRequestProcessor.process(responseEvent); + } + if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { + CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); + if (subscribe != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); + subscribe.response(eventResult); } } } - }); + } else if ((status >= Response.TRYING) && (status < Response.OK)) { + // 澧炲姞鍏跺畠鏃犻渶鍥炲鐨勫搷搴旓紝濡�101銆�180绛� + } else { + logger.warn("鎺ユ敹鍒板け璐ョ殑response鍝嶅簲锛乻tatus锛�" + status + ",message:" + response.getReasonPhrase()); + if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { + CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); + if (subscribe != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + subscribe.response(eventResult); + sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); + } + } + } + if (responseEvent.getDialog() != null) { + responseEvent.getDialog().delete(); + } + } } @@ -149,17 +142,58 @@ */ @Override public void processTimeout(TimeoutEvent timeoutEvent) { - if(timeoutProcessor != null) { - timeoutProcessor.process(timeoutEvent); + logger.info("[娑堟伅鍙戦�佽秴鏃禲"); + ClientTransaction clientTransaction = timeoutEvent.getClientTransaction(); + + if (clientTransaction != null) { + logger.info("[鍙戦�侀敊璇闃匽 clientTransaction != null"); + Request request = clientTransaction.getRequest(); + if (request != null) { + logger.info("[鍙戦�侀敊璇闃匽 request != null"); + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + logger.info("[鍙戦�侀敊璇闃匽"); + SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent); + if (subscribe != null){ + subscribe.response(eventResult); + } + sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); + sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); + } + } } + eventPublisher.requestTimeOut(timeoutEvent); } @Override public void processIOException(IOExceptionEvent exceptionEvent) { + System.out.println("processIOException"); } @Override public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) { +// if (transactionTerminatedEvent.isServerTransaction()) { +// ServerTransaction serverTransaction = transactionTerminatedEvent.getServerTransaction(); +// serverTransaction.get +// } + + +// Transaction transaction = null; +// System.out.println("processTransactionTerminated"); +// if (transactionTerminatedEvent.isServerTransaction()) { +// transaction = transactionTerminatedEvent.getServerTransaction(); +// }else { +// transaction = transactionTerminatedEvent.getClientTransaction(); +// } +// +// System.out.println(transaction.getBranchId()); +// System.out.println(transaction.getState()); +// System.out.println(transaction.getRequest().getMethod()); +// CallIdHeader header = (CallIdHeader)transaction.getRequest().getHeader(CallIdHeader.NAME); +// SipSubscribe.EventResult<TransactionTerminatedEvent> terminatedEventEventResult = new SipSubscribe.EventResult<>(transactionTerminatedEvent); + +// sipSubscribe.getErrorSubscribe(header.getCallId()).response(terminatedEventEventResult); } @Override -- Gitblit v1.8.0