src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -2,115 +2,113 @@ import java.text.ParseException; import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.sip.DialogTerminatedEvent; import javax.sip.IOExceptionEvent; import javax.sip.ListeningPoint; import javax.sip.PeerUnavailableException; import javax.sip.RequestEvent; import javax.sip.ResponseEvent; import javax.sip.ServerTransaction; import javax.sip.SipFactory; import javax.sip.SipListener; import javax.sip.SipProvider; import javax.sip.SipStack; import javax.sip.TimeoutEvent; import javax.sip.TransactionAlreadyExistsException; import javax.sip.TransactionTerminatedEvent; import javax.sip.TransactionUnavailableException; import javax.sip.address.AddressFactory; import javax.sip.header.HeaderFactory; import javax.sip.header.ViaHeader; import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorFactory; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor; import gov.nist.javax.sip.SipStackImpl; @Component public class SipLayer implements SipListener, Runnable { public class SipLayer implements SipListener { private final static Logger logger = LoggerFactory.getLogger(SipLayer.class); @Autowired private SipConfig sipConfig; private SipProvider tcpSipProvider; private SipProvider udpSipProvider; @Autowired private SIPProcessorFactory processorFactory; private SipStack sipStack; private AddressFactory addressFactory; private HeaderFactory headerFactory; private MessageFactory messageFactory; private SipFactory sipFactory; @PostConstruct /** * 消息处理器线程池 */ private ThreadPoolExecutor processThreadPool; @Bean("initSipServer") @DependsOn("allOffline") private void initSipServer() { Thread thread = new Thread(this); thread.setDaemon(true); thread.setName("sip server thread start"); thread.start(); int processThreadNum = Runtime.getRuntime().availableProcessors() * 10; LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<Runnable>(10000); processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum, 0L,TimeUnit.MILLISECONDS,processQueue, new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public void run() { SipFactory sipFactory = SipFactory.getInstance(); @Bean("sipFactory") @DependsOn("initSipServer") private SipFactory createSipFactory() { sipFactory = SipFactory.getInstance(); sipFactory.setPathName("gov.nist"); try { headerFactory = sipFactory.createHeaderFactory(); addressFactory = sipFactory.createAddressFactory(); messageFactory = sipFactory.createMessageFactory(); Properties properties = new Properties(); properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP"); properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getSipIp()); properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false"); /** * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE = * 0; public static final int TRACE_MESSAGES = 16; public static final int * TRACE_EXCEPTION = 17; public static final int TRACE_DEBUG = 32; */ properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "32"); properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log"); properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log"); sipStack = (SipStackImpl) sipFactory.createSipStack(properties); startTcpListener(); startUdpListener(); } catch (Exception e) { logger.error("Sip Server 启动失败! port {" + sipConfig.getSipPort() + "}"); e.printStackTrace(); } logger.info("Sip Server 启动成功 port {" + sipConfig.getSipPort() + "}"); return sipFactory; } @Bean("sipStack") @DependsOn({"initSipServer", "sipFactory"}) private SipStack createSipStack() throws PeerUnavailableException { Properties properties = new Properties(); properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP"); properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getSipIp()); properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false"); /** * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE = * 0; public static final int TRACE_MESSAGES = 16; public static final int * TRACE_EXCEPTION = 17; public static final int TRACE_DEBUG = 32; */ properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "0"); properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log"); properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log"); sipStack = (SipStackImpl) sipFactory.createSipStack(properties); return sipStack; } private void startTcpListener() throws Exception { ListeningPoint tcpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), "TCP"); tcpSipProvider = sipStack.createSipProvider(tcpListeningPoint); @Bean("tcpSipProvider") @DependsOn("sipStack") private SipProvider startTcpListener() throws Exception { ListeningPoint tcpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), "TCP"); SipProvider tcpSipProvider = sipStack.createSipProvider(tcpListeningPoint); tcpSipProvider.addSipListener(this); logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getSipPort() + "}"); return tcpSipProvider; } private void startUdpListener() throws Exception { ListeningPoint udpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), "UDP"); udpSipProvider = sipStack.createSipProvider(udpListeningPoint); @Bean("udpSipProvider") @DependsOn("sipStack") private SipProvider startUdpListener() throws Exception { ListeningPoint udpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), "UDP"); SipProvider udpSipProvider = sipStack.createSipProvider(udpListeningPoint); udpSipProvider.addSipListener(this); logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getSipPort() + "}"); return udpSipProvider; } /** @@ -119,8 +117,10 @@ */ @Override public void processRequest(RequestEvent evt) { ISIPRequestProcessor processor = processorFactory.createRequestProcessor(evt); processor.process(evt, this); // 由于jainsip是单线程程序,为提高性能并发处理 processThreadPool.execute(() -> { processorFactory.createRequestProcessor(evt).process(); }); } @Override @@ -210,53 +210,6 @@ public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { // TODO Auto-generated method stub } public ServerTransaction getServerTransaction(RequestEvent evt) { Request request = evt.getRequest(); ServerTransaction serverTransaction = evt.getServerTransaction(); // 判断TCP还是UDP boolean isTcp = false; ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); String transport = reqViaHeader.getTransport(); if (transport.equals("TCP")) { isTcp = true; } if (serverTransaction == null) { try { if (isTcp) { serverTransaction = tcpSipProvider.getNewServerTransaction(request); } else { serverTransaction = udpSipProvider.getNewServerTransaction(request); } } catch (TransactionAlreadyExistsException e) { e.printStackTrace(); } catch (TransactionUnavailableException e) { e.printStackTrace(); } } return serverTransaction; } public AddressFactory getAddressFactory() { return addressFactory; } public HeaderFactory getHeaderFactory() { return headerFactory; } public MessageFactory getMessageFactory() { return messageFactory; } public SipProvider getTcpSipProvider() { return tcpSipProvider; } public SipProvider getUdpSipProvider() { return udpSipProvider; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -2,13 +2,23 @@ import javax.sip.RequestEvent; import javax.sip.ResponseEvent; import javax.sip.SipProvider; import javax.sip.header.CSeqHeader; import javax.sip.message.Request; import javax.sip.message.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.impl.AckRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.impl.ByeRequestProcessor; @@ -23,6 +33,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.response.impl.CancelResponseProcessor; import com.genersoft.iot.vmp.gb28181.transmit.response.impl.InviteResponseProcessor; import com.genersoft.iot.vmp.gb28181.transmit.response.impl.OtherResponseProcessor; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.utils.redis.RedisUtil; /** * @Description:TODO(这里用一句话描述这个类的作用) @@ -32,29 +44,31 @@ @Component public class SIPProcessorFactory { @Autowired private InviteRequestProcessor inviteRequestProcessor; private final static Logger logger = LoggerFactory.getLogger(SIPProcessorFactory.class); @Autowired private RegisterRequestProcessor registerRequestProcessor; private SipConfig sipConfig; @Autowired private SubscribeRequestProcessor subscribeRequestProcessor; private RegisterLogicHandler handler; @Autowired private AckRequestProcessor ackRequestProcessor; private IVideoManagerStorager storager; @Autowired private ByeRequestProcessor byeRequestProcessor; private EventPublisher publisher; @Autowired private CancelRequestProcessor cancelRequestProcessor; private SIPCommander cmder; @Autowired private MessageRequestProcessor messageRequestProcessor; private RedisUtil redis; @Autowired private OtherRequestProcessor otherRequestProcessor; private DeferredResultHolder deferredResultHolder; @Autowired private DeviceOffLineDetector offLineDetector; @Autowired private InviteResponseProcessor inviteResponseProcessor; @@ -68,27 +82,64 @@ @Autowired private OtherResponseProcessor otherResponseProcessor; @Autowired @Qualifier(value="tcpSipProvider") private SipProvider tcpSipProvider; @Autowired @Qualifier(value="udpSipProvider") private SipProvider udpSipProvider; public ISIPRequestProcessor createRequestProcessor(RequestEvent evt) { Request request = evt.getRequest(); String method = request.getMethod(); logger.info("接收到消息:"+request.getMethod()); if (Request.INVITE.equals(method)) { return inviteRequestProcessor; InviteRequestProcessor processor = new InviteRequestProcessor(); processor.setRequestEvent(evt); processor.setTcpSipProvider(tcpSipProvider); processor.setUdpSipProvider(udpSipProvider); return processor; } else if (Request.REGISTER.equals(method)) { return registerRequestProcessor; RegisterRequestProcessor processor = new RegisterRequestProcessor(); processor.setRequestEvent(evt); processor.setTcpSipProvider(tcpSipProvider); processor.setUdpSipProvider(udpSipProvider); processor.setHandler(handler); processor.setPublisher(publisher); processor.setSipConfig(sipConfig); processor.setVideoManagerStorager(storager); return processor; } else if (Request.SUBSCRIBE.equals(method)) { return subscribeRequestProcessor; SubscribeRequestProcessor processor = new SubscribeRequestProcessor(); processor.setRequestEvent(evt); return processor; } else if (Request.ACK.equals(method)) { return ackRequestProcessor; AckRequestProcessor processor = new AckRequestProcessor(); processor.setRequestEvent(evt); return processor; } else if (Request.BYE.equals(method)) { return byeRequestProcessor; ByeRequestProcessor processor = new ByeRequestProcessor(); processor.setRequestEvent(evt); return processor; } else if (Request.CANCEL.equals(method)) { return cancelRequestProcessor; CancelRequestProcessor processor = new CancelRequestProcessor(); processor.setRequestEvent(evt); return processor; } else if (Request.MESSAGE.equals(method)) { return messageRequestProcessor; MessageRequestProcessor processor = new MessageRequestProcessor(); processor.setRequestEvent(evt); processor.setTcpSipProvider(tcpSipProvider); processor.setUdpSipProvider(udpSipProvider); processor.setPublisher(publisher); processor.setRedis(redis); processor.setDeferredResultHolder(deferredResultHolder); processor.setOffLineDetector(offLineDetector); processor.setCmder(cmder); processor.setStorager(storager); return processor; } else { return otherRequestProcessor; return new OtherRequestProcessor(); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/ISIPRequestProcessor.java
@@ -1,9 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.request; import javax.sip.RequestEvent; import com.genersoft.iot.vmp.gb28181.SipLayer; /** * @Description:处理接收IPCamera发来的SIP协议请求消息 * @author: swwheihei @@ -11,6 +7,6 @@ */ public interface ISIPRequestProcessor { public void process(RequestEvent evt, SipLayer layer); public void process(); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/SIPRequestAbstractProcessor.java
New file @@ -0,0 +1,127 @@ package com.genersoft.iot.vmp.gb28181.transmit.request; import javax.sip.PeerUnavailableException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipFactory; import javax.sip.SipProvider; import javax.sip.TransactionAlreadyExistsException; import javax.sip.TransactionUnavailableException; import javax.sip.address.AddressFactory; import javax.sip.header.HeaderFactory; import javax.sip.header.ViaHeader; import javax.sip.message.MessageFactory; import javax.sip.message.Request; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPServerTransaction; /** * @Description:处理接收IPCamera发来的SIP协议请求消息 * @author: songww * @date: 2020年5月3日 下午4:42:22 */ public abstract class SIPRequestAbstractProcessor implements ISIPRequestProcessor { protected RequestEvent evt; private SipProvider tcpSipProvider; private SipProvider udpSipProvider; @Override public void process() { this.process(evt); } public abstract void process(RequestEvent evt); public ServerTransaction getServerTransaction(RequestEvent evt) { Request request = evt.getRequest(); ServerTransaction serverTransaction = evt.getServerTransaction(); // 判断TCP还是UDP boolean isTcp = false; ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); String transport = reqViaHeader.getTransport(); if (transport.equals("TCP")) { isTcp = true; } if (serverTransaction == null) { try { if (isTcp) { SipStackImpl stack = (SipStackImpl)tcpSipProvider.getSipStack(); serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true); if (serverTransaction == null) { serverTransaction = tcpSipProvider.getNewServerTransaction(request); } } else { SipStackImpl stack = (SipStackImpl)udpSipProvider.getSipStack(); serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true); if (serverTransaction == null) { serverTransaction = udpSipProvider.getNewServerTransaction(request); } } } catch (TransactionAlreadyExistsException e) { e.printStackTrace(); } catch (TransactionUnavailableException e) { e.printStackTrace(); } } return serverTransaction; } public AddressFactory getAddressFactory() { try { return SipFactory.getInstance().createAddressFactory(); } catch (PeerUnavailableException e) { e.printStackTrace(); } return null; } public HeaderFactory getHeaderFactory() { try { return SipFactory.getInstance().createHeaderFactory(); } catch (PeerUnavailableException e) { e.printStackTrace(); } return null; } public MessageFactory getMessageFactory() { try { return SipFactory.getInstance().createMessageFactory(); } catch (PeerUnavailableException e) { e.printStackTrace(); } return null; } public RequestEvent getRequestEvent() { return evt; } public void setRequestEvent(RequestEvent evt) { this.evt = evt; } public SipProvider getTcpSipProvider() { return tcpSipProvider; } public void setTcpSipProvider(SipProvider tcpSipProvider) { this.tcpSipProvider = tcpSipProvider; } public SipProvider getUdpSipProvider() { return udpSipProvider; } public void setUdpSipProvider(SipProvider udpSipProvider) { this.udpSipProvider = udpSipProvider; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java
@@ -3,14 +3,10 @@ import javax.sip.Dialog; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.message.Request; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import gov.nist.javax.sip.header.CSeq; @@ -19,8 +15,7 @@ * @author: swwheihei * @date: 2020年5月3日 下午5:31:45 */ @Component public class AckRequestProcessor implements ISIPRequestProcessor { public class AckRequestProcessor extends SIPRequestAbstractProcessor { /** * 处理 ACK请求 @@ -31,7 +26,7 @@ * @param config */ @Override public void process(RequestEvent evt, SipLayer layer) { public void process(RequestEvent evt) { Request request = evt.getRequest(); Dialog dialog = evt.getDialog(); try { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
@@ -1,20 +1,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; /** * @Description: BYE请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:32:05 */ @Component public class ByeRequestProcessor implements ISIPRequestProcessor { public class ByeRequestProcessor extends SIPRequestAbstractProcessor { /** * 处理BYE请求 @@ -25,8 +20,8 @@ * @param config */ @Override public void process(RequestEvent evt, SipLayer layer) { // TODO Auto-generated method stub public void process(RequestEvent evt) { // TODO 优先级99 Bye Request消息实现,此消息一般为级联消息,上级给下级发送视频停止指令 } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/CancelRequestProcessor.java
@@ -1,20 +1,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; /** * @Description:CANCEL请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:32:23 */ @Component public class CancelRequestProcessor implements ISIPRequestProcessor { public class CancelRequestProcessor extends SIPRequestAbstractProcessor { /** * 处理CANCEL请求 @@ -25,8 +20,8 @@ * @param config */ @Override public void process(RequestEvent evt, SipLayer layer) { // TODO Auto-generated method stub public void process(RequestEvent evt) { // TODO 优先级99 Cancel Request消息实现,此消息一般为级联消息,上级给下级发送请求取消指令 } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java
@@ -1,20 +1,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; /** * @Description:处理INVITE请求 * @author: swwheihei * @date: 2020年5月3日 下午4:43:52 */ @Component public class InviteRequestProcessor implements ISIPRequestProcessor { public class InviteRequestProcessor extends SIPRequestAbstractProcessor { /** * 处理invite请求 @@ -23,8 +18,8 @@ * 请求消息 */ @Override public void process(RequestEvent evt, SipLayer layer) { // TODO Auto-generated method stub public void process(RequestEvent evt) { // TODO 优先级99 Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 // Request request = requestEvent.getRequest(); // // try { @@ -45,7 +40,6 @@ // Via via = (Via) headerFactory.createViaHeader(SIPMain.ip, SIPMain.port, "UDP", // callerVia.getBranch() + "sipphone"); // // // FIXME 需要测试是否能够通过设置VIA头域来修改VIA头域值 // cliReq.removeHeader(Via.NAME); // cliReq.addHeader(via); // src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -10,7 +10,6 @@ import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.message.Request; import javax.sip.message.Response; @@ -22,10 +21,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; @@ -35,7 +32,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.gb28181.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -46,38 +43,28 @@ * @author: swwheihei * @date: 2020年5月3日 下午5:32:41 */ @Component public class MessageRequestProcessor implements ISIPRequestProcessor { public class MessageRequestProcessor extends SIPRequestAbstractProcessor { private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class); private ServerTransaction transaction; private SipLayer layer; @Autowired private SIPCommander cmder; @Autowired private IVideoManagerStorager storager; @Autowired private EventPublisher publisher; @Autowired private RedisUtil redis; @Autowired private DeferredResultHolder deferredResultHolder; @Autowired private DeviceOffLineDetector offLineDetector; private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; private static final String MESSAGE_KEEP_ALIVE = "Keepalive"; private static final String MESSAGE_CONFIG_DOWNLOAD = "ConfigDownload"; private static final String MESSAGE_CATALOG = "Catalog"; private static final String MESSAGE_DEVICE_INFO = "DeviceInfo"; private static final String MESSAGE_KEEP_ALIVE = "Keepalive"; private static final String MESSAGE_ALARM = "Alarm"; private static final String MESSAGE_RECORD_INFO = "RecordInfo"; // private static final String MESSAGE_BROADCAST = "Broadcast"; @@ -93,23 +80,17 @@ * @param transaction */ @Override public void process(RequestEvent evt, SipLayer layer) { this.layer = layer; this.transaction = layer.getServerTransaction(evt); Request request = evt.getRequest(); SAXReader reader = new SAXReader(); reader.setEncoding("gbk"); Document xml; public void process(RequestEvent evt) { try { xml = reader.read(new ByteArrayInputStream(request.getRawContent())); Element rootElement = xml.getRootElement(); String cmd = rootElement.element("CmdType").getStringValue(); Element rootElement = getRootElement(evt); String cmd = XmlUtil.getText(rootElement,"CmdType"); if (MESSAGE_KEEP_ALIVE.equals(cmd)) { logger.info("接收到KeepAlive消息"); processMessageKeepAlive(evt); } else if (MESSAGE_CONFIG_DOWNLOAD.equals(cmd)) { logger.info("接收到ConfigDownload消息"); } else if (MESSAGE_CATALOG.equals(cmd)) { logger.info("接收到Catalog消息"); processMessageCatalogList(evt); @@ -126,7 +107,6 @@ } catch (DocumentException e) { e.printStackTrace(); } } /** @@ -273,15 +253,11 @@ try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement,"DeviceID"); Request request = evt.getRequest(); Response response = null; if (offLineDetector.isOnline(deviceId)) { response = layer.getMessageFactory().createResponse(Response.OK,request); responseAck(evt); publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); } else { response = layer.getMessageFactory().createResponse(Response.BAD_REQUEST,request); } transaction.sendResponse(response); } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } @@ -373,6 +349,11 @@ } } private void responseAck(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.OK,evt.getRequest()); getServerTransaction(evt).sendResponse(response); } private Element getRootElement(RequestEvent evt) throws DocumentException { Request request = evt.getRequest(); SAXReader reader = new SAXReader(); @@ -381,4 +362,28 @@ return xml.getRootElement(); } public void setCmder(SIPCommander cmder) { this.cmder = cmder; } public void setStorager(IVideoManagerStorager storager) { this.storager = storager; } public void setPublisher(EventPublisher publisher) { this.publisher = publisher; } public void setRedis(RedisUtil redis) { this.redis = redis; } public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { this.deferredResultHolder = deferredResultHolder; } public void setOffLineDetector(DeviceOffLineDetector offLineDetector) { this.offLineDetector = offLineDetector; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/OtherRequestProcessor.java
@@ -1,20 +1,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; /** * @Description:暂不支持的消息请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:32:59 */ @Component public class OtherRequestProcessor implements ISIPRequestProcessor { public class OtherRequestProcessor extends SIPRequestAbstractProcessor { /** * <p>Title: process</p> @@ -25,7 +20,7 @@ * @param config */ @Override public void process(RequestEvent evt, SipLayer layer) { public void process(RequestEvent evt) { System.out.println("no support the method! Method:" + evt.getRequest().getMethod()); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java
@@ -7,7 +7,6 @@ import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.header.AuthorizationHeader; import javax.sip.header.ContactHeader; @@ -17,19 +16,16 @@ import javax.sip.message.Request; import javax.sip.message.Response; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper; import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Host; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import gov.nist.javax.sip.address.AddressImpl; @@ -41,19 +37,14 @@ * @author: swwheihei * @date: 2020年5月3日 下午4:47:25 */ @Component public class RegisterRequestProcessor implements ISIPRequestProcessor { public class RegisterRequestProcessor extends SIPRequestAbstractProcessor { @Autowired private SipConfig sipConfig; @Autowired private RegisterLogicHandler handler; @Autowired private IVideoManagerStorager storager; @Autowired private EventPublisher publisher; /*** @@ -63,7 +54,7 @@ * 请求消息 */ @Override public void process(RequestEvent evt, SipLayer layer) { public void process(RequestEvent evt) { try { System.out.println("收到注册请求,开始处理"); Request request = evt.getRequest(); @@ -88,14 +79,14 @@ } else if (!passwordCorrect) { System.out.println("密码错误 回复401"); } response = layer.getMessageFactory().createResponse(Response.UNAUTHORIZED, request); new DigestServerAuthenticationHelper().generateChallenge(layer.getHeaderFactory(), response, sipConfig.getSipDomain()); response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getSipDomain()); } // 携带授权头并且密码正确 else if (passwordCorrect) { response = layer.getMessageFactory().createResponse(Response.OK, request); response = getMessageFactory().createResponse(Response.OK, request); // 添加date头 response.addHeader(layer.getHeaderFactory().createDateHeader(Calendar.getInstance(Locale.ENGLISH))); response.addHeader(getHeaderFactory().createDateHeader(Calendar.getInstance(Locale.ENGLISH))); ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME); // 添加Contact头 response.addHeader(request.getHeader(ContactHeader.NAME)); @@ -141,7 +132,7 @@ device.setTransport(isTcp ? "TCP" : "UDP"); } } layer.getServerTransaction(evt).sendResponse(response); getServerTransaction(evt).sendResponse(response); // 注册成功 // 保存到redis // 下发catelog查询目录 @@ -159,5 +150,21 @@ } } public void setSipConfig(SipConfig sipConfig) { this.sipConfig = sipConfig; } public void setHandler(RegisterLogicHandler handler) { this.handler = handler; } public void setVideoManagerStorager(IVideoManagerStorager storager) { this.storager = storager; } public void setPublisher(EventPublisher publisher) { this.publisher = publisher; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/SubscribeRequestProcessor.java
@@ -10,18 +10,14 @@ import javax.sip.message.Request; import javax.sip.message.Response; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; /** * @Description:SUBSCRIBE请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:31:20 */ @Component public class SubscribeRequestProcessor implements ISIPRequestProcessor { public class SubscribeRequestProcessor extends SIPRequestAbstractProcessor { /** * 处理SUBSCRIBE请求 @@ -32,18 +28,18 @@ * @param config */ @Override public void process(RequestEvent evt, SipLayer layer) { public void process(RequestEvent evt) { Request request = evt.getRequest(); try { Response response = null; response = layer.getMessageFactory().createResponse(200, request); response = getMessageFactory().createResponse(200, request); if (response != null) { ExpiresHeader expireHeader = layer.getHeaderFactory().createExpiresHeader(30); ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30); response.setExpires(expireHeader); } System.out.println("response : " + response.toString()); ServerTransaction transaction = layer.getServerTransaction(evt); ServerTransaction transaction = getServerTransaction(evt); if (transaction != null) { transaction.sendResponse(response); transaction.terminate(); src/main/java/com/genersoft/iot/vmp/vmanager/device/entity/Device.java
New file @@ -0,0 +1,401 @@ package com.genersoft.iot.vmp.vmanager.device.entity; import java.util.List; import javax.persistence.Column; import javax.persistence.Id; import javax.persistence.Table; import javax.persistence.Transient; import javax.validation.constraints.Max; import javax.validation.constraints.NotNull; import javax.validation.constraints.Size; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; /** * @Description:视频设备信息 * @author: songww * @date: 2020年5月8日 下午2:05:56 */ @ApiModel(value = "视频设备信息", description = "视频设备信息") @Table(name="VMP_VIDEODEVICES") public class Device { /** * 设备Id */ @ApiModelProperty("设备编号") @Id @Column(name="DEVICE_ID") @NotNull(message = "deviceId 不能为 null") @Size(min = 4, max = 32, message = "deviceId 必须大于 4 位并且小于 32 位") private String deviceId; /** * 设备名称 */ @ApiModelProperty("设备名称") @Column(name="DEVICE_NAME") @Size(max = 32, message = "deviceName 必须小于 32 位") private String deviceName; /** * 生产厂商 */ @ApiModelProperty("生产厂商") @Column(name="MANUFACTURER") @Size(max = 64, message = "manufacturer 必须小于 64 位") private String manufacturer; /** * 型号 */ @ApiModelProperty("型号") @Column(name="MODEL") @Size(max = 64, message = "manufacturer 必须小于 64 位") private String model; /** * 固件版本 */ @ApiModelProperty("固件版本") @Column(name="FIRMWARE") @Size(max = 64, message = "firmware 必须小于 64 位") private String firmware; /** * 通信协议 * GB28181 ONVIF */ @ApiModelProperty("通信协议") @Column(name="PROTOCOL") @NotNull(message = "protocol 不能为 null") @Size(max = 16, message = "protocol 必须小于 16 位") private String protocol; /** * SIP 传输协议 * UDP/TCP */ @ApiModelProperty("SIP 传输协议") @Column(name="TRANSPORT") @Size(min = 3,max = 3 ,message = "transport 必须为 3 位") private String transport; /** * 数据流传输模式 * UDP:udp传输 * TCP-ACTIVE:tcp主动模式 * TCP-PASSIVE:tcp被动模式 */ @ApiModelProperty("数据流传输模式") @Column(name="STREAM_MODE") @Size(max = 64, message = "streamMode 必须小于 16 位") private String streamMode; /** * IP地址 */ @ApiModelProperty("IP地址") @Column(name="IP") @Size(max = 15, message = "streamMode 必须小于 15 位") private String ip; /** * 端口号 */ @ApiModelProperty("端口号") @Column(name="PORT") @Max(value = 65535,message = "port 最大值为 65535") private Integer port; /** * 在线状态 1在线, 0离线 */ @ApiModelProperty("在线状态") @Size(min = 1,max = 1 ,message = "online 必须为 1 位") @Column(name="ONLINE") private String online; /** * 通道数量 */ @ApiModelProperty("通道数量") @Column(name="CHANNEL_SUM") @Max(value = 1000000000,message = "channelSum 最大值为 1000000000") private Integer channelSum; @Override public String toString() { return "Device{" + "deviceId='" + deviceId + '\'' + ", deviceName='" + deviceName + '\'' + ", manufacturer='" + manufacturer + '\'' + ", model='" + model + '\'' + ", firmware='" + firmware + '\'' + ", protocol='" + protocol + '\'' + ", transport='" + transport + '\'' + ", streamMode='" + streamMode + '\'' + ", ip='" + ip + '\'' + ", port=" + port + ", online='" + online + '\'' + ", channelSum=" + channelSum + ", createTime='" + createTime + '\'' + ", registerTime='" + registerTime + '\'' + ", heartbeatTime='" + heartbeatTime + '\'' + ", updateTime='" + updateTime + '\'' + ", updatePerson='" + updatePerson + '\'' + ", syncTime='" + syncTime + '\'' + ", syncPerson='" + syncPerson + '\'' + ", username='" + username + '\'' + ", password='" + password + '\'' + ", channelList=" + channelList + '}'; } /** * 创建时间 */ @ApiModelProperty("创建时间") @Column(name="CREATE_TIME") private String createTime; /** * 注册时间 */ @ApiModelProperty("注册时间") @Column(name="REGISTER_TIME") private String registerTime; /** * 心跳时间 */ @ApiModelProperty("心跳时间") @Column(name="HEARTBEAT_TIME") private String heartbeatTime; /** * 修改时间 */ @ApiModelProperty("更新时间") @Column(name="UPDATE_TIME") private String updateTime; /** * 修改人 */ @ApiModelProperty("修改人") @Column(name="UPDATE_PERSON") private String updatePerson; /** * 同步时间 */ @ApiModelProperty("同步时间") @Column(name="SYNC_TIME") private String syncTime; /** * 同步人 */ @ApiModelProperty("同步人") @Column(name="SYNC_PERSON") private String syncPerson; /** * ONVIF协议-用户名 */ @ApiModelProperty("用户名") @Column(name="USERNAME") @Size(max = 32, message = "username 必须小于 32 位") private String username; /** * ONVIF协议-密码 */ @ApiModelProperty("密码") @Size(max = 32, message = "password 必须小于 32 位") @Column(name="PASSWORD") private String password; @Transient private List<DeviceChannel> channelList; public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public String getDeviceName() { return deviceName; } public void setDeviceName(String deviceName) { this.deviceName = deviceName; } public String getTransport() { return transport; } public void setTransport(String transport) { this.transport = transport; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } public String getManufacturer() { return manufacturer; } public void setManufacturer(String manufacturer) { this.manufacturer = manufacturer; } public String getModel() { return model; } public void setModel(String model) { this.model = model; } public String getFirmware() { return firmware; } public void setFirmware(String firmware) { this.firmware = firmware; } public String getOnline() { return online; } public void setOnline(String online) { this.online = online; } public String getStreamMode() { return streamMode; } public void setStreamMode(String streamMode) { this.streamMode = streamMode; } public List<DeviceChannel> getChannelList() { return channelList; } public void setChannelList(List<DeviceChannel> channelList) { this.channelList = channelList; } public Integer getChannelSum() { return channelSum; } public void setChannelSum(Integer channelSum) { this.channelSum = channelSum; } public String getCreateTime() { return createTime; } public void setCreateTime(String createTime) { this.createTime = createTime; } public String getRegisterTime() { return registerTime; } public void setRegisterTime(String registerTime) { this.registerTime = registerTime; } public String getHeartbeatTime() { return heartbeatTime; } public void setHeartbeatTime(String heartbeatTime) { this.heartbeatTime = heartbeatTime; } public String getUpdateTime() { return updateTime; } public void setUpdateTime(String updateTime) { this.updateTime = updateTime; } public String getUpdatePerson() { return updatePerson; } public void setUpdatePerson(String updatePerson) { this.updatePerson = updatePerson; } public String getSyncTime() { return syncTime; } public void setSyncTime(String syncTime) { this.syncTime = syncTime; } public String getSyncPerson() { return syncPerson; } public void setSyncPerson(String syncPerson) { this.syncPerson = syncPerson; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } } src/main/java/com/genersoft/iot/vmp/vmanager/device/entity/DeviceChannel.java
New file @@ -0,0 +1,385 @@ package com.genersoft.iot.vmp.vmanager.device.entity; import javax.persistence.Column; import javax.persistence.Id; import javax.persistence.Table; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; /** * @Description:设备通道信息 * @author: songww * @date: 2020年5月20日 下午9:00:46 */ @ApiModel(value = "设备通道信息", description = "设备通道信息") @Table(name="VMP_VIDEOCHANNELS") public class DeviceChannel { /** * 通道编号 */ @ApiModelProperty("通道编号") @Id @Column(name="CHANNEL_ID") private String channelId; /** * 设备编号 */ @ApiModelProperty("设备编号") @Column(name="DEVICE_ID") private String deviceId; /** * 通道名 */ @ApiModelProperty("通道名") @Column(name="CHANNEL_NAME") private String channelName; /** * 生产厂商 */ @ApiModelProperty("生产厂商") @Column(name="MANUFACTURER") private String manufacture; /** * 型号 */ @ApiModelProperty("型号") @Column(name="MODEL") private String model; /** * 设备归属 */ @ApiModelProperty("设备归属") @Column(name="OWNER") private String owner; /** * 行政区域 */ @ApiModelProperty("行政区域") @Column(name="CIVIL_CODE") private String civilCode; /** * 警区 */ @ApiModelProperty("警区") @Column(name="BLOCK") private String block; /** * 安装地址 */ @ApiModelProperty("安装地址") @Column(name="ADDRESS") private String address; /** * 是否有子设备 1有, 0没有 */ @ApiModelProperty("是否有子设备") @Column(name="PARENTAL") private String parental; /** * 父级id */ @ApiModelProperty("父级编码") @Column(name="PARENT_ID") private String parentId; /** * 信令安全模式 缺省为0; 0:不采用; 2: S/MIME签名方式; 3: S/ MIME加密签名同时采用方式; 4:数字摘要方式 */ @ApiModelProperty("信令安全模式") @Column(name="SAFETY_WAY") private String safetyWay; /** * 注册方式 缺省为1;1:符合IETFRFC3261标准的认证注册模 式; 2:基于口令的双向认证注册模式; 3:基于数字证书的双向认证注册模式 */ @ApiModelProperty("注册方式") @Column(name="REGISTER_WAY") private String registerWay; /** * 证书序列号 */ @ApiModelProperty("证书序列号") @Column(name="CERT_NUM") private String certNum; /** * 证书有效标识 缺省为0;证书有效标识:0:无效1: 有效 */ @ApiModelProperty("证书有效标识") @Column(name="CERT_VALID") private String certValid; /** * 证书无效原因码 */ @ApiModelProperty("证书无效原因码") @Column(name="CERT_ERRCODE") private String certErrCode; /** * 证书终止有效期 */ @ApiModelProperty("证书终止有效期") @Column(name="CERT_ENDTIME") private String certEndTime; /** * 保密属性 缺省为0; 0:不涉密, 1:涉密 */ @ApiModelProperty("保密属性") @Column(name="SECRECY") private String secrecy; /** * IP地址 */ @ApiModelProperty("IP地址") @Column(name="IP") private String ip; /** * 端口号 */ @ApiModelProperty("端口号") @Column(name="PORT") private Integer port; /** * 密码 */ @ApiModelProperty("密码") @Column(name="PASSWORD") private String password; /** * 在线/离线 * 1在线,0离线 * 默认在线 * 信令: * <Status>ON</Status> * <Status>OFF</Status> * 遇到过NVR下的IPC下发信令可以推流, 但是 Status 响应 OFF */ @ApiModelProperty("状态") @Column(name="ONLINE") private String online; /** * 经度 */ @ApiModelProperty("经度") @Column(name="LONGITUDE") private double longitude; /** * 纬度 */ @ApiModelProperty("纬度") @Column(name="LATITUDE") private double latitude; public String getChannelId() { return channelId; } public void setChannelId(String channelId) { this.channelId = channelId; } public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public String getChannelName() { return channelName; } public void setChannelName(String channelName) { this.channelName = channelName; } public String getOnline() { return online; } public void setOnline(String online) { this.online = online; } public String getManufacture() { return manufacture; } public void setManufacture(String manufacture) { this.manufacture = manufacture; } public String getModel() { return model; } public void setModel(String model) { this.model = model; } public String getOwner() { return owner; } public void setOwner(String owner) { this.owner = owner; } public String getCivilCode() { return civilCode; } public void setCivilCode(String civilCode) { this.civilCode = civilCode; } public String getBlock() { return block; } public void setBlock(String block) { this.block = block; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public String getParental() { return parental; } public void setParental(String parental) { this.parental = parental; } public String getParentId() { return parentId; } public void setParentId(String parentId) { this.parentId = parentId; } public String getSafetyWay() { return safetyWay; } public void setSafetyWay(String safetyWay) { this.safetyWay = safetyWay; } public String getRegisterWay() { return registerWay; } public void setRegisterWay(String registerWay) { this.registerWay = registerWay; } public String getCertNum() { return certNum; } public void setCertNum(String certNum) { this.certNum = certNum; } public String getCertValid() { return certValid; } public void setCertValid(String certValid) { this.certValid = certValid; } public String getCertErrCode() { return certErrCode; } public void setCertErrCode(String certErrCode) { this.certErrCode = certErrCode; } public String getCertEndTime() { return certEndTime; } public void setCertEndTime(String certEndTime) { this.certEndTime = certEndTime; } public String getSecrecy() { return secrecy; } public void setSecrecy(String secrecy) { this.secrecy = secrecy; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public double getLongitude() { return longitude; } public void setLongitude(double longitude) { this.longitude = longitude; } public double getLatitude() { return latitude; } public void setLatitude(double latitude) { this.latitude = latitude; } }