lin
2022-01-08 ddb36e54bd51761138c536ccca889d3f80182334
级联平台添加GPS订阅支持
14个文件已修改
4个文件已添加
572 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java 78 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java 136 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -58,6 +58,10 @@
    public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
    public static final String SIP_SN_PREFIX = "VMP_SIP_SN_";
    public static final String SIP_SUBSCRIBE_PREFIX = "SIP_SUBSCRIBE_";
    //************************** redis 消息*********************************
    public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
    public static final String WVP_MSG_GPS_PREFIX = "WVP_MSG_GPS_";
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java
New file
@@ -0,0 +1,8 @@
package com.genersoft.iot.vmp.gb28181.bean;
public class CmdType {
    public static final String CATALOG = "Catalog";
    public static final String ALARM = "Alarm";
    public static final String MOBILE_POSITION = "MobilePosition";
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
New file
@@ -0,0 +1,78 @@
package com.genersoft.iot.vmp.gb28181.bean;
import javax.sip.RequestEvent;
import javax.sip.header.*;
import javax.sip.message.Request;
public class SubscribeInfo {
    public SubscribeInfo() {
    }
    public SubscribeInfo(RequestEvent evt, String id) {
        this.id = id;
        Request request = evt.getRequest();
        CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
        this.callId = callIdHeader.getCallId();
        FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
        this.fromTag = fromHeader.getTag();
        ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME);
        this.expires = expiresHeader.getExpires();
        this.event = (EventHeader)request.getHeader(EventHeader.NAME);
    }
    private String id;
    private int expires;
    private String callId;
    private EventHeader event;
    private String fromTag;
    private String toTag;
    public String getId() {
        return id;
    }
    public int getExpires() {
        return expires;
    }
    public String getCallId() {
        return callId;
    }
    public EventHeader getEvent() {
        return event;
    }
    public String getFromTag() {
        return fromTag;
    }
    public void setToTag(String toTag) {
        this.toTag = toTag;
    }
    public String getToTag() {
        return toTag;
    }
    public void setId(String id) {
        this.id = id;
    }
    public void setExpires(int expires) {
        this.expires = expires;
    }
    public void setCallId(String callId) {
        this.callId = callId;
    }
    public void setEvent(EventHeader event) {
        this.event = event;
    }
    public void setFromTag(String fromTag) {
        this.fromTag = fromTag;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
New file
@@ -0,0 +1,52 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
 * 平台订阅到期事件
 */
@Component
public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
    private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
    @Autowired
    private UserSetup userSetup;
    @Autowired
    private DynamicTask dynamicTask;
    public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
        super(listenerContainer, userSetup);
    }
    /**
     * 监听失效的key
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        //  获取失效的key
        String expiredKey = message.toString();
        logger.debug(expiredKey);
        // 订阅到期
        String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
        if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
            // 取消定时任务
            dynamicTask.stopCron(expiredKey);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
New file
@@ -0,0 +1,59 @@
package com.genersoft.iot.vmp.gb28181.task;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import java.text.SimpleDateFormat;
import java.util.List;
public class GPSSubscribeTask implements Runnable{
    private IRedisCatchStorage redisCatchStorage;
    private IVideoManagerStorager storager;
    private ISIPCommanderForPlatform sipCommanderForPlatform;
    private String platformId;
    private String sn;
    private String key;
    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) {
        this.redisCatchStorage = redisCatchStorage;
        this.storager = storager;
        this.platformId = platformId;
        this.sn = sn;
        this.key = key;
        this.sipCommanderForPlatform = sipCommanderForPlatform;
    }
    @Override
    public void run() {
        SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key);
        if (subscribe != null) {
            System.out.println("发送GPS消息");
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
            if (parentPlatform == null || parentPlatform.isStatus()) {
                // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
                List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
                if (gbStreams.size() > 0) {
                    for (GbStream gbStream : gbStreams) {
                        String gbId = gbStream.getGbId();
                        GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
                        if (gpsMsgInfo != null && gbStream.isStatus()) {
                            // 发送GPS消息
                            sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
                        }
                    }
                }
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
@@ -2,7 +2,9 @@
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import javax.sip.header.WWWAuthenticateHeader;
@@ -61,4 +63,12 @@
     */
    boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag);
    /**
     * 向上级回复移动位置订阅消息
     * @param parentPlatform 平台信息
     * @param gpsMsgInfo GPS信息
     * @param subscribeInfo 订阅相关的信息
     * @return
     */
    boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo);
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -32,6 +33,9 @@
    @Autowired
    private SipFactory sipFactory;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    public Request createKeetpaliveMessageRequest(ParentPlatform parentPlatform, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
        Request request = null;
@@ -57,7 +61,7 @@
        // Forwards
        MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
        // ceq
        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
        request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
                toHeader, viaHeaders, maxForwards);
@@ -122,7 +126,7 @@
                                         String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
        Request registerRequest = createRegisterRequest(parentPlatform, 2L, fromTag, viaTag, callIdHeader);
        Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader);
        String realm = www.getRealm();
        String nonce = www.getNonce();
@@ -208,7 +212,7 @@
        // Forwards
        MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
        // ceq
        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
        MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
        // 设置编码, 防止中文乱码
        messageFactory.setDefaultContentEncodingCharset("gb2312");
@@ -223,4 +227,43 @@
        request.setContent(content, contentTypeHeader);
        return request;
    }
    public Request createNotifyRequest(ParentPlatform parentPlatform, String content, String fromTag, String toTag, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException {
        Request request = null;
        // sipuri
        SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
        // via
        ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
        ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
                parentPlatform.getTransport(), null);
        viaHeader.setRPort();
        viaHeaders.add(viaHeader);
        // from
        SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
                parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
        Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
        FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
        // to
        SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
        Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
        ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag);
        // Forwards
        MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
        // ceq
        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
        MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
        // 设置编码, 防止中文乱码
        messageFactory.setDefaultContentEncodingCharset("gb2312");
        request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
                toHeader, viaHeaders, maxForwards);
        List<String> agentParam = new ArrayList<>();
        agentParam.add("wvp-pro");
        UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
        request.addHeader(userAgentHeader);
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
        request.setContent(content, contentTypeHeader);
        return request;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -3,9 +3,11 @@
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +94,7 @@
                    callIdHeader = udpSipProvider.getNewCallId();
                }
                request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, 1L, "FromRegister" + tm, null, callIdHeader);
                request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader);
                // 将 callid 写入缓存, 等注册成功可以更新状态
                redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId());
@@ -325,4 +327,41 @@
        }
        return true;
    }
    @Override
    public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) {
        if (parentPlatform == null) {
            return false;
        }
        try {
            StringBuffer deviceStatusXml = new StringBuffer(600);
            deviceStatusXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
            deviceStatusXml.append("<Notify>\r\n");
            deviceStatusXml.append("<CmdType>MobilePosition</CmdType>\r\n");
            deviceStatusXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
            deviceStatusXml.append("<DeviceID>" + gpsMsgInfo.getId() + "</DeviceID>\r\n");
            deviceStatusXml.append("<Time>" + gpsMsgInfo.getTime() + "</Time>\r\n");
            deviceStatusXml.append("<Longitude>" + gpsMsgInfo.getLng() + "</Longitude>\r\n");
            deviceStatusXml.append("<Latitude>" + gpsMsgInfo.getLat() + "</Latitude>\r\n");
            deviceStatusXml.append("<Speed>" + gpsMsgInfo.getSpeed() + "</Speed>\r\n");
            deviceStatusXml.append("<Direction>" + gpsMsgInfo.getDirection() + "</Direction>\r\n");
            deviceStatusXml.append("<Altitude>" + gpsMsgInfo.getAltitude() + "</Altitude>\r\n");
            deviceStatusXml.append("</Notify>\r\n");
            CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            callIdHeader.setCallId(subscribeInfo.getCallId());
            String tm = Long.toString(System.currentTimeMillis());
            Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, deviceStatusXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader);
            transmitRequest(parentPlatform, request);
        } catch (SipException | ParseException | InvalidArgumentException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -18,6 +18,7 @@
import javax.sip.address.AddressFactory;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.HeaderFactory;
import javax.sip.header.ViaHeader;
import javax.sip.message.MessageFactory;
@@ -153,7 +154,7 @@
     * @throws InvalidArgumentException
     * @throws ParseException
     */
    public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
    public void responseSdpAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
        SipFactory sipFactory = SipFactory.getInstance();
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
@@ -168,6 +169,31 @@
        getServerTransaction(evt).sendResponse(response);
    }
    /**
     * 回复带xml的200
     * @param evt
     * @param xml
     * @throws SipException
     * @throws InvalidArgumentException
     * @throws ParseException
     */
    public Response responseXmlAck(RequestEvent evt, String xml) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
        SipFactory sipFactory = SipFactory.getInstance();
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
        response.setContent(xml, contentTypeHeader);
        SipURI sipURI = (SipURI)evt.getRequest().getRequestURI();
        Address concatAddress = sipFactory.createAddressFactory().createAddress(
                sipFactory.createAddressFactory().createSipURI(sipURI.getUser(),  sipURI.getHost()+":"+sipURI.getPort()
                ));
        response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
        response.addHeader(evt.getRequest().getHeader(ExpiresHeader.NAME));
        getServerTransaction(evt).sendResponse(response);
        return response;
    }
    public Element getRootElement(RequestEvent evt) throws DocumentException {
        return getRootElement(evt, "gb2312");
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -253,7 +253,7 @@
                        content.append("f=\r\n");
                        try {
                            responseAck(evt, content.toString());
                            responseSdpAck(evt, content.toString());
                        } catch (SipException e) {
                            e.printStackTrace();
                        } catch (InvalidArgumentException e) {
@@ -310,7 +310,7 @@
                    content.append("f=\r\n");
                    try {
                        responseAck(evt, content.toString());
                        responseSdpAck(evt, content.toString());
                    } catch (SipException e) {
                        e.printStackTrace();
                    } catch (InvalidArgumentException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -62,9 +62,7 @@
    @Autowired
    private DeviceOffLineDetector offLineDetector;
    private static final String NOTIFY_CATALOG = "Catalog";
    private static final String NOTIFY_ALARM = "Alarm";
    private static final String NOTIFY_MOBILE_POSITION = "MobilePosition";
    private String method = "NOTIFY";
    @Autowired
@@ -82,13 +80,13 @@
            Element rootElement = getRootElement(evt);
            String cmd = XmlUtil.getText(rootElement, "CmdType");
            if (NOTIFY_CATALOG.equals(cmd)) {
            if (CmdType.CATALOG.equals(cmd)) {
                logger.info("接收到Catalog通知");
                processNotifyCatalogList(evt);
            } else if (NOTIFY_ALARM.equals(cmd)) {
            } else if (CmdType.ALARM.equals(cmd)) {
                logger.info("接收到Alarm通知");
                processNotifyAlarm(evt);
            } else if (NOTIFY_MOBILE_POSITION.equals(cmd)) {
            } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                logger.info("接收到MobilePosition通知");
                processNotifyMobilePosition(evt);
            } else {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -1,8 +1,21 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.CmdType;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -13,7 +26,10 @@
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.Header;
import javax.sip.header.ToHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
@@ -29,6 +45,21 @@
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ISIPCommanderForPlatform sipCommanderForPlatform;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private UserSetup userSetup;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -46,21 +77,39 @@
        Request request = evt.getRequest();
        try {
            Response response = null;
            response = getMessageFactory().createResponse(200, request);
            if (response != null) {
                ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
                response.setExpires(expireHeader);
            }
            logger.info("response : " + response.toString());
            ServerTransaction transaction = getServerTransaction(evt);
            if (transaction != null) {
                transaction.sendResponse(response);
                transaction.getDialog().delete();
                transaction.terminate();
            Element rootElement = getRootElement(evt);
            String cmd = XmlUtil.getText(rootElement, "CmdType");
            if (CmdType.MOBILE_POSITION.equals(cmd)) {
                logger.info("接收到MobilePosition订阅");
                processNotifyMobilePosition(evt, rootElement);
//            } else if (CmdType.ALARM.equals(cmd)) {
//                logger.info("接收到Alarm订阅");
//                processNotifyAlarm(evt, rootElement);
//            } else if (CmdType.CATALOG.equals(cmd)) {
//                logger.info("接收到Catalog订阅");
//                processNotifyCatalogList(evt, rootElement);
            } else {
                logger.info("processRequest serverTransactionId is null.");
                logger.info("接收到消息:" + cmd);
//                responseAck(evt, Response.OK);
                Response response = null;
                response = getMessageFactory().createResponse(200, request);
                if (response != null) {
                    ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
                    response.setExpires(expireHeader);
                }
                logger.info("response : " + response.toString());
                ServerTransaction transaction = getServerTransaction(evt);
                if (transaction != null) {
                    transaction.sendResponse(response);
                    transaction.getDialog().delete();
                    transaction.terminate();
                } else {
                    logger.info("processRequest serverTransactionId is null.");
                }
            }
        } catch (ParseException e) {
            e.printStackTrace();
@@ -68,8 +117,67 @@
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
            e.printStackTrace();
        } catch (DocumentException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理移动位置订阅消息
     */
    private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) {
        String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
        String deviceID = XmlUtil.getText(rootElement, "DeviceID");
        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
        String sn = XmlUtil.getText(rootElement, "SN");
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_MobilePosition_" + platformId;
        StringBuilder resultXml = new StringBuilder(200);
        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
                .append("<Response>\r\n")
                .append("<CmdType>MobilePosition</CmdType>\r\n")
                .append("<SN>" + sn + "</SN>\r\n")
                .append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
                .append("<Result>OK</Result>\r\n")
                .append("</Response>\r\n");
        if (subscribeInfo.getExpires() > 0) {
            if (redisCatchStorage.getSubscribe(key) != null) {
                dynamicTask.stopCron(key);
            }
            String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
            dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key), Integer.parseInt(interval));
            redisCatchStorage.updateSubscribe(key, subscribeInfo);
        }else if (subscribeInfo.getExpires() == 0) {
            dynamicTask.stopCron(key);
            redisCatchStorage.delSubscribe(key);
        }
        try {
            Response response = responseXmlAck(evt, resultXml.toString());
            ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
            subscribeInfo.setToTag(toHeader.getTag());
            redisCatchStorage.updateSubscribe(key, subscribeInfo);
        } catch (SipException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }
    private void processNotifyAlarm(RequestEvent evt, Element rootElement) {
    }
    private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
@@ -23,7 +23,7 @@
    private double speed;
    /**
     * 产生通知时间,
     * 产生通知时间, 时间格式: 2020-01-14T14:32:12
     */
    private String time;
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
@@ -17,6 +17,7 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
        System.out.println(JSON.toJSON(gpsMsgInfo));
        redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -2,10 +2,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
@@ -196,4 +193,16 @@
    void resetAllCSEQ();
    void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo);
    GPSMsgInfo getGpsMsgInfo(String gbId);
    Long getSN(String method);
    void resetAllSN();
    void updateSubscribe(String key, SubscribeInfo subscribeInfo);
    SubscribeInfo getSubscribe(String key);
    void delSubscribe(String key);
}
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -54,6 +54,11 @@
            "WHERE pgs.platformId = '${platformId}'")
    List<GbStream> queryGbStreamListInPlatform(String platformId);
    @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs  LEFT JOIN platform_gb_stream pgs " +
            "ON  gs.app = pgs.app and gs.stream = pgs.stream WHERE pgs.app is NULL and pgs.stream is NULL")
    List<GbStream> queryStreamNotInPlatform();
    @Update("UPDATE gb_stream " +
            "SET status=${status} " +
            "WHERE app=#{app} AND stream=#{stream}")
@@ -87,4 +92,6 @@
            "</foreach> " +
            "</script>")
    void batchAdd(List<StreamPushItem> subList);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -50,8 +50,30 @@
    }
    @Override
    public Long getSN(String method) {
        String key = VideoManagerConstants.SIP_SN_PREFIX  + userSetup.getServerId() + "_" +  method;
        long result =  redis.incr(key, 1L);
        if (result > Integer.MAX_VALUE) {
            redis.set(key, 1);
            result = 1;
        }
        return result;
    }
    @Override
    public void resetAllCSEQ() {
        String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetup.getServerId() + "_*";
        List<Object> keys = redis.scan(scanKey);
        for (int i = 0; i < keys.size(); i++) {
            String key = (String) keys.get(i);
            redis.set(key, 1);
        }
    }
    @Override
    public void resetAllSN() {
        String scanKey = VideoManagerConstants.SIP_SN_PREFIX  + userSetup.getServerId() + "_*";
        List<Object> keys = redis.scan(scanKey);
        for (int i = 0; i < keys.size(); i++) {
            String key = (String) keys.get(i);
@@ -433,4 +455,25 @@
        String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId();
        redis.set(key, gpsMsgInfo);
    }
    @Override
    public GPSMsgInfo getGpsMsgInfo(String gbId) {
        String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gbId;
        return (GPSMsgInfo)redis.get(key);
    }
    @Override
    public void updateSubscribe(String key, SubscribeInfo subscribeInfo) {
        redis.set(key, subscribeInfo, subscribeInfo.getExpires());
    }
    @Override
    public SubscribeInfo getSubscribe(String key) {
        return (SubscribeInfo)redis.get(key);
    }
    @Override
    public void delSubscribe(String key) {
        redis.del(key);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -486,18 +486,21 @@
        // 更新缓存
        parentPlatformCatch.setParentPlatform(parentPlatform);
        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        // 共享所有视频流,需要将现有视频流添加到此平台
        List<GbStream> gbStreams = gbStreamMapper.selectAll();
        if (gbStreams.size() > 0) {
            for (GbStream gbStream : gbStreams) {
                gbStream.setCatalogId(parentPlatform.getCatalogId());
            }
            if (parentPlatform.isShareAllLiveStream()) {
                gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
            }else {
                gbStreamService.delPlatformInfo(gbStreams);
        if (parentPlatform.isEnable()) {
            // 共享所有视频流,需要将现有视频流添加到此平台
            List<GbStream> gbStreams = gbStreamMapper.queryStreamNotInPlatform();
            if (gbStreams.size() > 0) {
                for (GbStream gbStream : gbStreams) {
                    gbStream.setCatalogId(parentPlatform.getCatalogId());
                }
                if (parentPlatform.isShareAllLiveStream()) {
                    gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
                }else {
                    gbStreamService.delPlatformInfo(gbStreams);
                }
            }
        }
        return result > 0;
    }