From ddb36e54bd51761138c536ccca889d3f80182334 Mon Sep 17 00:00:00 2001 From: lin <18010473990@163.com> Date: 星期六, 08 一月 2022 16:47:20 +0800 Subject: [PATCH] 级联平台添加GPS订阅支持 --- src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java | 78 +++++++ src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java | 1 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java | 52 +++++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java | 136 ++++++++++++- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java | 49 ++++ src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java | 23 +- src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 4 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 4 src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java | 8 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java | 28 ++ src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 43 ++++ src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java | 2 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 17 + src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 41 ++++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java | 10 + src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 7 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 10 src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java | 59 +++++ 18 files changed, 530 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index e45055b..7f659ab 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -58,6 +58,10 @@ public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_"; + public static final String SIP_SN_PREFIX = "VMP_SIP_SN_"; + + public static final String SIP_SUBSCRIBE_PREFIX = "SIP_SUBSCRIBE_"; + //************************** redis 娑堟伅********************************* public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_"; public static final String WVP_MSG_GPS_PREFIX = "WVP_MSG_GPS_"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java new file mode 100644 index 0000000..ed5cad6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java @@ -0,0 +1,8 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +public class CmdType { + + public static final String CATALOG = "Catalog"; + public static final String ALARM = "Alarm"; + public static final String MOBILE_POSITION = "MobilePosition"; +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java new file mode 100644 index 0000000..66d67bf --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -0,0 +1,78 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import javax.sip.RequestEvent; +import javax.sip.header.*; +import javax.sip.message.Request; + +public class SubscribeInfo { + + public SubscribeInfo() { + } + + public SubscribeInfo(RequestEvent evt, String id) { + this.id = id; + Request request = evt.getRequest(); + CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); + this.callId = callIdHeader.getCallId(); + FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); + this.fromTag = fromHeader.getTag(); + ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME); + this.expires = expiresHeader.getExpires(); + this.event = (EventHeader)request.getHeader(EventHeader.NAME); + } + + private String id; + private int expires; + private String callId; + private EventHeader event; + private String fromTag; + private String toTag; + + public String getId() { + return id; + } + + public int getExpires() { + return expires; + } + + public String getCallId() { + return callId; + } + + public EventHeader getEvent() { + return event; + } + + public String getFromTag() { + return fromTag; + } + + public void setToTag(String toTag) { + this.toTag = toTag; + } + + public String getToTag() { + return toTag; + } + + public void setId(String id) { + this.id = id; + } + + public void setExpires(int expires) { + this.expires = expires; + } + + public void setCallId(String callId) { + this.callId = callId; + } + + public void setEvent(EventHeader event) { + this.event = event; + } + + public void setFromTag(String fromTag) { + this.fromTag = fromTag; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java new file mode 100644 index 0000000..bac8e3d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java @@ -0,0 +1,52 @@ +package com.genersoft.iot.vmp.gb28181.event.subscribe; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import org.checkerframework.checker.units.qual.A; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +/** + * 骞冲彴璁㈤槄鍒版湡浜嬩欢 + */ +@Component +public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener { + + private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class); + + @Autowired + private UserSetup userSetup; + + @Autowired + private DynamicTask dynamicTask; + + public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) { + super(listenerContainer, userSetup); + } + + + /** + * 鐩戝惉澶辨晥鐨刱ey + * @param message + * @param pattern + */ + @Override + public void onMessage(Message message, byte[] pattern) { + // 鑾峰彇澶辨晥鐨刱ey + String expiredKey = message.toString(); + logger.debug(expiredKey); + // 璁㈤槄鍒版湡 + String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_"; + if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { + // 鍙栨秷瀹氭椂浠诲姟 + dynamicTask.stopCron(expiredKey); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java new file mode 100644 index 0000000..ce990a0 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -0,0 +1,59 @@ +package com.genersoft.iot.vmp.gb28181.task; + +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; + +import java.text.SimpleDateFormat; +import java.util.List; + +public class GPSSubscribeTask implements Runnable{ + + private IRedisCatchStorage redisCatchStorage; + private IVideoManagerStorager storager; + private ISIPCommanderForPlatform sipCommanderForPlatform; + private String platformId; + private String sn; + private String key; + + private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) { + this.redisCatchStorage = redisCatchStorage; + this.storager = storager; + this.platformId = platformId; + this.sn = sn; + this.key = key; + this.sipCommanderForPlatform = sipCommanderForPlatform; + } + + @Override + public void run() { + + SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key); + if (subscribe != null) { + System.out.println("鍙戦�丟PS娑堟伅"); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); + if (parentPlatform == null || parentPlatform.isStatus()) { + // TODO 鏆傛椂鍙鐞嗚棰戞祦鐨勫洖澶�,鍚庣画澧炲姞瀵瑰浗鏍囪澶囩殑鏀寔 + List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId); + if (gbStreams.size() > 0) { + for (GbStream gbStream : gbStreams) { + String gbId = gbStream.getGbId(); + GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); + if (gpsMsgInfo != null && gbStream.isStatus()) { + // 鍙戦�丟PS娑堟伅 + sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + } + } + } + } + } + + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index fe30293..e8b4124 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -2,7 +2,9 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import javax.sip.header.WWWAuthenticateHeader; @@ -61,4 +63,12 @@ */ boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag); + /** + * 鍚戜笂绾у洖澶嶇Щ鍔ㄤ綅缃闃呮秷鎭� + * @param parentPlatform 骞冲彴淇℃伅 + * @param gpsMsgInfo GPS淇℃伅 + * @param subscribeInfo 璁㈤槄鐩稿叧鐨勪俊鎭� + * @return + */ + boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index cc41af7..27125e1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import gov.nist.javax.sip.message.MessageFactoryImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -32,6 +33,9 @@ @Autowired private SipFactory sipFactory; + @Autowired + private IRedisCatchStorage redisCatchStorage; + public Request createKeetpaliveMessageRequest(ParentPlatform parentPlatform, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; @@ -57,7 +61,7 @@ // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // ceq - CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE); + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE); request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); @@ -122,7 +126,7 @@ String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException { - Request registerRequest = createRegisterRequest(parentPlatform, 2L, fromTag, viaTag, callIdHeader); + Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader); String realm = www.getRealm(); String nonce = www.getNonce(); @@ -208,7 +212,7 @@ // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // ceq - CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE); + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE); MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); // 璁剧疆缂栫爜锛� 闃叉涓枃涔辩爜 messageFactory.setDefaultContentEncodingCharset("gb2312"); @@ -223,4 +227,43 @@ request.setContent(content, contentTypeHeader); return request; } + + public Request createNotifyRequest(ParentPlatform parentPlatform, String content, String fromTag, String toTag, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException { + Request request = null; + // sipuri + SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort()); + // via + ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); + ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), + parentPlatform.getTransport(), null); + viaHeader.setRPort(); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), + parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort()); + Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain()); + Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag); + + // Forwards + MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); + // ceq + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY); + MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); + // 璁剧疆缂栫爜锛� 闃叉涓枃涔辩爜 + messageFactory.setDefaultContentEncodingCharset("gb2312"); + request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader, + toHeader, viaHeaders, maxForwards); + List<String> agentParam = new ArrayList<>(); + agentParam.add("wvp-pro"); + UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); + request.addHeader(userAgentHeader); + + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml"); + request.setContent(content, contentTypeHeader); + return request; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index cf8b0a5..1707bde 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -3,9 +3,11 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +94,7 @@ callIdHeader = udpSipProvider.getNewCallId(); } - request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, 1L, "FromRegister" + tm, null, callIdHeader); + request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader); // 灏� callid 鍐欏叆缂撳瓨锛� 绛夋敞鍐屾垚鍔熷彲浠ユ洿鏂扮姸鎬� redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId()); @@ -325,4 +327,41 @@ } return true; } + + @Override + public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) { + if (parentPlatform == null) { + return false; + } + + try { + StringBuffer deviceStatusXml = new StringBuffer(600); + deviceStatusXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n"); + deviceStatusXml.append("<Notify>\r\n"); + deviceStatusXml.append("<CmdType>MobilePosition</CmdType>\r\n"); + deviceStatusXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n"); + deviceStatusXml.append("<DeviceID>" + gpsMsgInfo.getId() + "</DeviceID>\r\n"); + deviceStatusXml.append("<Time>" + gpsMsgInfo.getTime() + "</Time>\r\n"); + deviceStatusXml.append("<Longitude>" + gpsMsgInfo.getLng() + "</Longitude>\r\n"); + deviceStatusXml.append("<Latitude>" + gpsMsgInfo.getLat() + "</Latitude>\r\n"); + deviceStatusXml.append("<Speed>" + gpsMsgInfo.getSpeed() + "</Speed>\r\n"); + deviceStatusXml.append("<Direction>" + gpsMsgInfo.getDirection() + "</Direction>\r\n"); + deviceStatusXml.append("<Altitude>" + gpsMsgInfo.getAltitude() + "</Altitude>\r\n"); + deviceStatusXml.append("</Notify>\r\n"); + + CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + callIdHeader.setCallId(subscribeInfo.getCallId()); + + String tm = Long.toString(System.currentTimeMillis()); + + Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, deviceStatusXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader); + transmitRequest(parentPlatform, request); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + return false; + } + return true; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index e7f8f72..d4de725 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -18,6 +18,7 @@ import javax.sip.address.AddressFactory; import javax.sip.address.SipURI; import javax.sip.header.ContentTypeHeader; +import javax.sip.header.ExpiresHeader; import javax.sip.header.HeaderFactory; import javax.sip.header.ViaHeader; import javax.sip.message.MessageFactory; @@ -153,7 +154,7 @@ * @throws InvalidArgumentException * @throws ParseException */ - public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { + public void responseSdpAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); SipFactory sipFactory = SipFactory.getInstance(); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); @@ -168,6 +169,31 @@ getServerTransaction(evt).sendResponse(response); } + /** + * 鍥炲甯ml鐨�200 + * @param evt + * @param xml + * @throws SipException + * @throws InvalidArgumentException + * @throws ParseException + */ + public Response responseXmlAck(RequestEvent evt, String xml) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); + SipFactory sipFactory = SipFactory.getInstance(); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml"); + response.setContent(xml, contentTypeHeader); + + SipURI sipURI = (SipURI)evt.getRequest().getRequestURI(); + + Address concatAddress = sipFactory.createAddressFactory().createAddress( + sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort() + )); + response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); + response.addHeader(evt.getRequest().getHeader(ExpiresHeader.NAME)); + getServerTransaction(evt).sendResponse(response); + return response; + } + public Element getRootElement(RequestEvent evt) throws DocumentException { return getRootElement(evt, "gb2312"); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 203d58b..2a9abad 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -253,7 +253,7 @@ content.append("f=\r\n"); try { - responseAck(evt, content.toString()); + responseSdpAck(evt, content.toString()); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -310,7 +310,7 @@ content.append("f=\r\n"); try { - responseAck(evt, content.toString()); + responseSdpAck(evt, content.toString()); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 2e58d9d..67bb56c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -62,9 +62,7 @@ @Autowired private DeviceOffLineDetector offLineDetector; - private static final String NOTIFY_CATALOG = "Catalog"; - private static final String NOTIFY_ALARM = "Alarm"; - private static final String NOTIFY_MOBILE_POSITION = "MobilePosition"; + private String method = "NOTIFY"; @Autowired @@ -82,13 +80,13 @@ Element rootElement = getRootElement(evt); String cmd = XmlUtil.getText(rootElement, "CmdType"); - if (NOTIFY_CATALOG.equals(cmd)) { + if (CmdType.CATALOG.equals(cmd)) { logger.info("鎺ユ敹鍒癈atalog閫氱煡"); processNotifyCatalogList(evt); - } else if (NOTIFY_ALARM.equals(cmd)) { + } else if (CmdType.ALARM.equals(cmd)) { logger.info("鎺ユ敹鍒癆larm閫氱煡"); processNotifyAlarm(evt); - } else if (NOTIFY_MOBILE_POSITION.equals(cmd)) { + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); processNotifyMobilePosition(evt); } else { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index be4b2ce..9c5be8e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -1,8 +1,21 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.CmdType; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import org.dom4j.DocumentException; +import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -13,7 +26,10 @@ import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import javax.sip.header.ExpiresHeader; +import javax.sip.header.Header; +import javax.sip.header.ToHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; @@ -29,6 +45,21 @@ @Autowired private SIPProcessorObserver sipProcessorObserver; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private ISIPCommanderForPlatform sipCommanderForPlatform; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private UserSetup userSetup; @Override public void afterPropertiesSet() throws Exception { @@ -46,21 +77,39 @@ Request request = evt.getRequest(); try { - Response response = null; - response = getMessageFactory().createResponse(200, request); - if (response != null) { - ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30); - response.setExpires(expireHeader); - } - logger.info("response : " + response.toString()); - ServerTransaction transaction = getServerTransaction(evt); - if (transaction != null) { - transaction.sendResponse(response); - transaction.getDialog().delete(); - transaction.terminate(); + Element rootElement = getRootElement(evt); + String cmd = XmlUtil.getText(rootElement, "CmdType"); + if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("鎺ユ敹鍒癕obilePosition璁㈤槄"); + processNotifyMobilePosition(evt, rootElement); +// } else if (CmdType.ALARM.equals(cmd)) { +// logger.info("鎺ユ敹鍒癆larm璁㈤槄"); +// processNotifyAlarm(evt, rootElement); +// } else if (CmdType.CATALOG.equals(cmd)) { +// logger.info("鎺ユ敹鍒癈atalog璁㈤槄"); +// processNotifyCatalogList(evt, rootElement); } else { - logger.info("processRequest serverTransactionId is null."); + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); +// responseAck(evt, Response.OK); + + Response response = null; + response = getMessageFactory().createResponse(200, request); + if (response != null) { + ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30); + response.setExpires(expireHeader); + } + logger.info("response : " + response.toString()); + ServerTransaction transaction = getServerTransaction(evt); + if (transaction != null) { + transaction.sendResponse(response); + transaction.getDialog().delete(); + transaction.terminate(); + } else { + logger.info("processRequest serverTransactionId is null."); + } } + + } catch (ParseException e) { e.printStackTrace(); @@ -68,8 +117,67 @@ e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); + } catch (DocumentException e) { + e.printStackTrace(); } - + + } + + /** + * 澶勭悊绉诲姩浣嶇疆璁㈤槄娑堟伅 + */ + private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) { + String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + String deviceID = XmlUtil.getText(rootElement, "DeviceID"); + SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId); + String sn = XmlUtil.getText(rootElement, "SN"); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + platformId; + + StringBuilder resultXml = new StringBuilder(200); + resultXml.append("<?xml version=\"1.0\" ?>\r\n") + .append("<Response>\r\n") + .append("<CmdType>MobilePosition</CmdType>\r\n") + .append("<SN>" + sn + "</SN>\r\n") + .append("<DeviceID>" + deviceID + "</DeviceID>\r\n") + .append("<Result>OK</Result>\r\n") + .append("</Response>\r\n"); + + if (subscribeInfo.getExpires() > 0) { + if (redisCatchStorage.getSubscribe(key) != null) { + dynamicTask.stopCron(key); + } + String interval = XmlUtil.getText(rootElement, "Interval"); // GPS涓婃姤鏃堕棿闂撮殧 + dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval)); + + redisCatchStorage.updateSubscribe(key, subscribeInfo); + }else if (subscribeInfo.getExpires() == 0) { + dynamicTask.stopCron(key); + redisCatchStorage.delSubscribe(key); + } + + + + try { + Response response = responseXmlAck(evt, resultXml.toString()); + ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME); + subscribeInfo.setToTag(toHeader.getTag()); + redisCatchStorage.updateSubscribe(key, subscribeInfo); + + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } + + private void processNotifyAlarm(RequestEvent evt, Element rootElement) { + + } + + private void processNotifyCatalogList(RequestEvent evt, Element rootElement) { + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java index 3c5b2bf..855cc5e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java @@ -23,7 +23,7 @@ private double speed; /** - * 浜х敓閫氱煡鏃堕棿, + * 浜х敓閫氱煡鏃堕棿, 鏃堕棿鏍煎紡锛� 2020-01-14T14:32:12 */ private String time; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java index e3bfcde..4e390b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java @@ -17,6 +17,7 @@ @Override public void onMessage(Message message, byte[] bytes) { GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); + System.out.println(JSON.toJSON(gpsMsgInfo)); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 709d548..ea85cf1 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -2,10 +2,7 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; @@ -196,4 +193,16 @@ void resetAllCSEQ(); void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo); + + GPSMsgInfo getGpsMsgInfo(String gbId); + + Long getSN(String method); + + void resetAllSN(); + + void updateSubscribe(String key, SubscribeInfo subscribeInfo); + + SubscribeInfo getSubscribe(String key); + + void delSubscribe(String key); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index fa6b51c..3a26b4e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -54,6 +54,11 @@ "WHERE pgs.platformId = '${platformId}'") List<GbStream> queryGbStreamListInPlatform(String platformId); + + @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " + + "ON gs.app = pgs.app and gs.stream = pgs.stream WHERE pgs.app is NULL and pgs.stream is NULL") + List<GbStream> queryStreamNotInPlatform(); + @Update("UPDATE gb_stream " + "SET status=${status} " + "WHERE app=#{app} AND stream=#{stream}") @@ -87,4 +92,6 @@ "</foreach> " + "</script>") void batchAdd(List<StreamPushItem> subList); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 6ea768a..adda231 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -50,8 +50,30 @@ } @Override + public Long getSN(String method) { + String key = VideoManagerConstants.SIP_SN_PREFIX + userSetup.getServerId() + "_" + method; + + long result = redis.incr(key, 1L); + if (result > Integer.MAX_VALUE) { + redis.set(key, 1); + result = 1; + } + return result; + } + + @Override public void resetAllCSEQ() { String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetup.getServerId() + "_*"; + List<Object> keys = redis.scan(scanKey); + for (int i = 0; i < keys.size(); i++) { + String key = (String) keys.get(i); + redis.set(key, 1); + } + } + + @Override + public void resetAllSN() { + String scanKey = VideoManagerConstants.SIP_SN_PREFIX + userSetup.getServerId() + "_*"; List<Object> keys = redis.scan(scanKey); for (int i = 0; i < keys.size(); i++) { String key = (String) keys.get(i); @@ -433,4 +455,25 @@ String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId(); redis.set(key, gpsMsgInfo); } + + @Override + public GPSMsgInfo getGpsMsgInfo(String gbId) { + String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gbId; + return (GPSMsgInfo)redis.get(key); + } + + @Override + public void updateSubscribe(String key, SubscribeInfo subscribeInfo) { + redis.set(key, subscribeInfo, subscribeInfo.getExpires()); + } + + @Override + public SubscribeInfo getSubscribe(String key) { + return (SubscribeInfo)redis.get(key); + } + + @Override + public void delSubscribe(String key) { + redis.del(key); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 7f0efcd..d381574 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -486,18 +486,21 @@ // 鏇存柊缂撳瓨 parentPlatformCatch.setParentPlatform(parentPlatform); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - // 鍏变韩鎵�鏈夎棰戞祦锛岄渶瑕佸皢鐜版湁瑙嗛娴佹坊鍔犲埌姝ゅ钩鍙� - List<GbStream> gbStreams = gbStreamMapper.selectAll(); - if (gbStreams.size() > 0) { - for (GbStream gbStream : gbStreams) { - gbStream.setCatalogId(parentPlatform.getCatalogId()); - } - if (parentPlatform.isShareAllLiveStream()) { - gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId()); - }else { - gbStreamService.delPlatformInfo(gbStreams); + if (parentPlatform.isEnable()) { + // 鍏变韩鎵�鏈夎棰戞祦锛岄渶瑕佸皢鐜版湁瑙嗛娴佹坊鍔犲埌姝ゅ钩鍙� + List<GbStream> gbStreams = gbStreamMapper.queryStreamNotInPlatform(); + if (gbStreams.size() > 0) { + for (GbStream gbStream : gbStreams) { + gbStream.setCatalogId(parentPlatform.getCatalogId()); + } + if (parentPlatform.isShareAllLiveStream()) { + gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId()); + }else { + gbStreamService.delPlatformInfo(gbStreams); + } } } + return result > 0; } -- Gitblit v1.8.0