From c96ab05d7d0fb9ede820d89ea5e5b55820554e29 Mon Sep 17 00:00:00 2001 From: swwheihei <swwheihei@163.com> Date: 星期四, 16 七月 2020 17:31:41 +0800 Subject: [PATCH] 尝试解决内存溢出,并使用多线程提高性能 --- src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java | 177 +++++++++++++++++++++------------------------------------- 1 files changed, 65 insertions(+), 112 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 0f6a092..9cc9c25 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -2,115 +2,113 @@ import java.text.ParseException; import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; -import javax.annotation.PostConstruct; import javax.sip.DialogTerminatedEvent; import javax.sip.IOExceptionEvent; import javax.sip.ListeningPoint; +import javax.sip.PeerUnavailableException; import javax.sip.RequestEvent; import javax.sip.ResponseEvent; -import javax.sip.ServerTransaction; import javax.sip.SipFactory; import javax.sip.SipListener; import javax.sip.SipProvider; import javax.sip.SipStack; import javax.sip.TimeoutEvent; -import javax.sip.TransactionAlreadyExistsException; import javax.sip.TransactionTerminatedEvent; -import javax.sip.TransactionUnavailableException; -import javax.sip.address.AddressFactory; -import javax.sip.header.HeaderFactory; -import javax.sip.header.ViaHeader; -import javax.sip.message.MessageFactory; -import javax.sip.message.Request; import javax.sip.message.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorFactory; -import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor; import gov.nist.javax.sip.SipStackImpl; @Component -public class SipLayer implements SipListener, Runnable { +public class SipLayer implements SipListener { private final static Logger logger = LoggerFactory.getLogger(SipLayer.class); @Autowired private SipConfig sipConfig; - private SipProvider tcpSipProvider; - - private SipProvider udpSipProvider; - @Autowired private SIPProcessorFactory processorFactory; private SipStack sipStack; - private AddressFactory addressFactory; - private HeaderFactory headerFactory; - private MessageFactory messageFactory; + private SipFactory sipFactory; - @PostConstruct + /** + * 娑堟伅澶勭悊鍣ㄧ嚎绋嬫睜 + */ + private ThreadPoolExecutor processThreadPool; + + @Bean("initSipServer") + @DependsOn("allOffline") private void initSipServer() { - Thread thread = new Thread(this); - thread.setDaemon(true); - thread.setName("sip server thread start"); - thread.start(); + + int processThreadNum = Runtime.getRuntime().availableProcessors() * 10; + LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<Runnable>(10000); + processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum, + 0L,TimeUnit.MILLISECONDS,processQueue, + new ThreadPoolExecutor.CallerRunsPolicy()); } - - @Override - public void run() { - SipFactory sipFactory = SipFactory.getInstance(); + + @Bean("sipFactory") + @DependsOn("initSipServer") + private SipFactory createSipFactory() { + sipFactory = SipFactory.getInstance(); sipFactory.setPathName("gov.nist"); - try { - headerFactory = sipFactory.createHeaderFactory(); - - addressFactory = sipFactory.createAddressFactory(); - messageFactory = sipFactory.createMessageFactory(); - - Properties properties = new Properties(); - properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP"); - properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getSipIp()); - properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false"); - /** - * sip_server_log.log 鍜� sip_debug_log.log public static final int TRACE_NONE = - * 0; public static final int TRACE_MESSAGES = 16; public static final int - * TRACE_EXCEPTION = 17; public static final int TRACE_DEBUG = 32; - */ - properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "32"); - properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log"); - properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log"); - sipStack = (SipStackImpl) sipFactory.createSipStack(properties); - - startTcpListener(); - startUdpListener(); - } catch (Exception e) { - logger.error("Sip Server 鍚姩澶辫触锛� port {" + sipConfig.getSipPort() + "}"); - e.printStackTrace(); - } - logger.info("Sip Server 鍚姩鎴愬姛 port {" + sipConfig.getSipPort() + "}"); + return sipFactory; + } + + @Bean("sipStack") + @DependsOn({"initSipServer", "sipFactory"}) + private SipStack createSipStack() throws PeerUnavailableException { + Properties properties = new Properties(); + properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP"); + properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getSipIp()); + properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false"); + /** + * sip_server_log.log 鍜� sip_debug_log.log public static final int TRACE_NONE = + * 0; public static final int TRACE_MESSAGES = 16; public static final int + * TRACE_EXCEPTION = 17; public static final int TRACE_DEBUG = 32; + */ + properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "0"); + properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log"); + properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log"); + sipStack = (SipStackImpl) sipFactory.createSipStack(properties); + return sipStack; } - private void startTcpListener() throws Exception { - ListeningPoint tcpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), - "TCP"); - tcpSipProvider = sipStack.createSipProvider(tcpListeningPoint); + @Bean("tcpSipProvider") + @DependsOn("sipStack") + private SipProvider startTcpListener() throws Exception { + ListeningPoint tcpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), "TCP"); + SipProvider tcpSipProvider = sipStack.createSipProvider(tcpListeningPoint); tcpSipProvider.addSipListener(this); + logger.info("Sip Server TCP 鍚姩鎴愬姛 port {" + sipConfig.getSipPort() + "}"); + return tcpSipProvider; } - - private void startUdpListener() throws Exception { - ListeningPoint udpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), - "UDP"); - udpSipProvider = sipStack.createSipProvider(udpListeningPoint); + + @Bean("udpSipProvider") + @DependsOn("sipStack") + private SipProvider startUdpListener() throws Exception { + ListeningPoint udpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), "UDP"); + SipProvider udpSipProvider = sipStack.createSipProvider(udpListeningPoint); udpSipProvider.addSipListener(this); + logger.info("Sip Server TCP 鍚姩鎴愬姛 port {" + sipConfig.getSipPort() + "}"); + return udpSipProvider; } /** @@ -119,8 +117,10 @@ */ @Override public void processRequest(RequestEvent evt) { - ISIPRequestProcessor processor = processorFactory.createRequestProcessor(evt); - processor.process(evt, this); + // 鐢变簬jainsip鏄崟绾跨▼绋嬪簭锛屼负鎻愰珮鎬ц兘骞跺彂澶勭悊 + processThreadPool.execute(() -> { + processorFactory.createRequestProcessor(evt).process(); + }); } @Override @@ -210,53 +210,6 @@ public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { // TODO Auto-generated method stub - } - - public ServerTransaction getServerTransaction(RequestEvent evt) { - Request request = evt.getRequest(); - ServerTransaction serverTransaction = evt.getServerTransaction(); - // 鍒ゆ柇TCP杩樻槸UDP - boolean isTcp = false; - ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); - String transport = reqViaHeader.getTransport(); - if (transport.equals("TCP")) { - isTcp = true; - } - - if (serverTransaction == null) { - try { - if (isTcp) { - serverTransaction = tcpSipProvider.getNewServerTransaction(request); - } else { - serverTransaction = udpSipProvider.getNewServerTransaction(request); - } - } catch (TransactionAlreadyExistsException e) { - e.printStackTrace(); - } catch (TransactionUnavailableException e) { - e.printStackTrace(); - } - } - return serverTransaction; - } - - public AddressFactory getAddressFactory() { - return addressFactory; - } - - public HeaderFactory getHeaderFactory() { - return headerFactory; - } - - public MessageFactory getMessageFactory() { - return messageFactory; - } - - public SipProvider getTcpSipProvider() { - return tcpSipProvider; - } - - public SipProvider getUdpSipProvider() { - return udpSipProvider; } } -- Gitblit v1.8.0