648540858
2021-11-05 eb4716ba82f13078dd88e967e7906080c0ac0205
添加目录订阅消息与接口
19个文件已添加
3103 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/ISIPRequestProcessor.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorAbstract.java 179 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 123 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 115 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 386 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor.java 1103 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 384 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java 197 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java 104 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
New file
@@ -0,0 +1,42 @@
package com.genersoft.iot.vmp.conf;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
/**
 * 动态定时任务
 */
@Component
public class DynamicTask {
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    private Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        return new ThreadPoolTaskScheduler();
    }
    public String startCron(String key, Runnable task, String corn) {
        stopCron(key);
        ScheduledFuture future = threadPoolTaskScheduler.schedule(task, new CronTrigger(corn));
        futureMap.put(key, future);
        return "startCron";
    }
    public void stopCron(String key) {
        if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) {
            futureMap.get(key).cancel(true);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/ISIPRequestProcessor.java
New file
@@ -0,0 +1,14 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request;
import javax.sip.RequestEvent;
/**
 * @description: 对SIP事件进行处理,包括request, response, timeout, ioException, transactionTerminated,dialogTerminated
 * @author: panlinlin
 * @date:   2021年11月5日 15:47
 */
public interface ISIPRequestProcessor {
    void process(RequestEvent event);
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorAbstract.java
New file
@@ -0,0 +1,179 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPServerTransaction;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import javax.sip.*;
import javax.sip.address.Address;
import javax.sip.address.AddressFactory;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
import javax.sip.header.HeaderFactory;
import javax.sip.header.ViaHeader;
import javax.sip.message.MessageFactory;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.io.ByteArrayInputStream;
import java.text.ParseException;
/**
 * @description:处理接收IPCamera发来的SIP协议请求消息
 * @author: songww
 * @date:   2020年5月3日 下午4:42:22
 */
public abstract class SIPRequestProcessorAbstract implements InitializingBean, ISIPRequestProcessor {
    private final static Logger logger = LoggerFactory.getLogger(SIPRequestProcessorAbstract.class);
    @Autowired
    @Qualifier(value="tcpSipProvider")
    private SipProviderImpl tcpSipProvider;
    @Autowired
    @Qualifier(value="udpSipProvider")
    private SipProviderImpl udpSipProvider;
    /**
     * 根据 RequestEvent 获取 ServerTransaction
     * @param evt
     * @return
     */
    public ServerTransaction getServerTransaction(RequestEvent evt) {
        Request request = evt.getRequest();
        ServerTransaction serverTransaction = evt.getServerTransaction();
        // 判断TCP还是UDP
        boolean isTcp = false;
        ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
        String transport = reqViaHeader.getTransport();
        if (transport.equals("TCP")) {
            isTcp = true;
        }
        if (serverTransaction == null) {
            try {
                if (isTcp) {
                    SipStackImpl stack = (SipStackImpl)tcpSipProvider.getSipStack();
                    serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true);
                    if (serverTransaction == null) {
                        serverTransaction = tcpSipProvider.getNewServerTransaction(request);
                    }
                } else {
                    SipStackImpl stack = (SipStackImpl)udpSipProvider.getSipStack();
                    serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true);
                    if (serverTransaction == null) {
                        serverTransaction = udpSipProvider.getNewServerTransaction(request);
                    }
                }
            } catch (TransactionAlreadyExistsException e) {
                logger.error(e.getMessage());
            } catch (TransactionUnavailableException e) {
                logger.error(e.getMessage());
            }
        }
        return serverTransaction;
    }
    public AddressFactory getAddressFactory() {
        try {
            return SipFactory.getInstance().createAddressFactory();
        } catch (PeerUnavailableException e) {
            e.printStackTrace();
        }
        return null;
    }
    public HeaderFactory getHeaderFactory() {
        try {
            return SipFactory.getInstance().createHeaderFactory();
        } catch (PeerUnavailableException e) {
            e.printStackTrace();
        }
        return null;
    }
    public MessageFactory getMessageFactory() {
        try {
            return SipFactory.getInstance().createMessageFactory();
        } catch (PeerUnavailableException e) {
            e.printStackTrace();
        }
        return null;
    }
    /***
     * 回复状态码
     * 100 trying
     * 200 OK
     * 400
     * 404
     * @param evt
     * @throws SipException
     * @throws InvalidArgumentException
     * @throws ParseException
     */
    public void responseAck(RequestEvent evt, int statusCode) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
        ServerTransaction serverTransaction = getServerTransaction(evt);
        serverTransaction.sendResponse(response);
        if (statusCode >= 200) {
            if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
        }
    }
    public void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
        response.setReasonPhrase(msg);
        ServerTransaction serverTransaction = getServerTransaction(evt);
        serverTransaction.sendResponse(response);
        if (statusCode >= 200) {
            if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
        }
    }
    /**
     * 回复带sdp的200
     * @param evt
     * @param sdp
     * @throws SipException
     * @throws InvalidArgumentException
     * @throws ParseException
     */
    public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
        Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
        SipFactory sipFactory = SipFactory.getInstance();
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
        response.setContent(sdp, contentTypeHeader);
        SipURI sipURI = (SipURI)evt.getRequest().getRequestURI();
        Address concatAddress = sipFactory.createAddressFactory().createAddress(
                sipFactory.createAddressFactory().createSipURI(sipURI.getUser(),  sipURI.getHost()+":"+sipURI.getPort()
                ));
        response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
        getServerTransaction(evt).sendResponse(response);
    }
    public Element getRootElement(RequestEvent evt) throws DocumentException {
        return getRootElement(evt, "gb2312");
    }
    public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
        if (charset == null) charset = "gb2312";
        Request request = evt.getRequest();
        SAXReader reader = new SAXReader();
        reader.setEncoding(charset);
        Document xml = reader.read(new ByteArrayInputStream(request.getRawContent()));
        return xml.getRootElement();
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
New file
@@ -0,0 +1,123 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.RequestEvent;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import java.util.HashMap;
import java.util.Map;
/**
 * @description:ACK请求处理器
 * @author: swwheihei
 * @date:   2020年5月3日 下午5:31:45
 */
@Component
public class AckRequestProcessor extends SIPRequestProcessorAbstract {
    private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
    private String method = "ACK";
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    /**
     * 处理  ACK请求
     *
     * @param evt
     */
    @Override
    public void process(RequestEvent evt) {
        Dialog dialog = evt.getDialog();
        if (dialog == null) return;
        if (dialog.getState()== DialogState.CONFIRMED) {
            String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
            String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId);
            String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
            String deviceId = sendRtpItem.getDeviceId();
            StreamInfo streamInfo = null;
            if (deviceId == null) {
                streamInfo = new StreamInfo();
                streamInfo.setApp(sendRtpItem.getApp());
                streamInfo.setStreamId(sendRtpItem.getStreamId());
            }else {
                streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
                sendRtpItem.setStreamId(streamInfo.getStreamId());
                streamInfo.setApp("rtp");
            }
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
            logger.info(platformGbId);
            logger.info(channelId);
            Map<String, Object> param = new HashMap<>();
            param.put("vhost","__defaultVhost__");
            param.put("app",streamInfo.getApp());
            param.put("stream",streamInfo.getStreamId());
            param.put("ssrc", sendRtpItem.getSsrc());
            param.put("dst_url",sendRtpItem.getIp());
            param.put("dst_port", sendRtpItem.getPort());
            param.put("is_udp", is_Udp);
            //param.put ("src_port", sendRtpItem.getLocalPort());
            // 设备推流查询,成功后才能转推
            boolean rtpPushed = false;
            long startTime = System.currentTimeMillis();
            while (!rtpPushed) {
                try {
                    if (System.currentTimeMillis() - startTime < 30 * 1000) {
                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                        if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) {
                            rtpPushed = true;
                            logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
                                    streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
                            zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                        } else {
                            logger.info("等待设备推流[{}/{}].......",
                                    streamInfo.getApp() ,streamInfo.getStreamId());
                            Thread.sleep(1000);
                            continue;
                        }
                    } else {
                        rtpPushed = true;
                        logger.info("设备推流[{}/{}]超时,终止向上级推流",
                                streamInfo.getApp() ,streamInfo.getStreamId());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
New file
@@ -0,0 +1,115 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
 * @description: BYE请求处理器
 * @author: lawrencehj
 * @date:   2021年3月9日
 */
@Component
public class ByeRequestProcessor extends SIPRequestProcessorAbstract {
    private Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class);
    @Autowired
    private ISIPCommander cmder;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    private String method = "BYE";
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
    /**
     * 处理BYE请求
     * @param evt
     */
    @Override
    public void process(RequestEvent evt) {
        try {
            responseAck(evt, Response.OK);
            Dialog dialog = evt.getDialog();
            if (dialog == null) return;
            if (dialog.getState().equals(DialogState.TERMINATED)) {
                String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
                String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId);
                logger.info("收到bye, [{}/{}]", platformGbId, channelId);
                if (sendRtpItem != null){
                    String streamId = sendRtpItem.getStreamId();
                    Map<String, Object> param = new HashMap<>();
                    param.put("vhost","__defaultVhost__");
                    param.put("app",sendRtpItem.getApp());
                    param.put("stream",streamId);
                    param.put("ssrc",sendRtpItem.getSsrc());
                    logger.info("停止向上级推流:" + streamId);
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                    redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
                    if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) {
                        logger.info(streamId + "无其它观看者,通知设备停止推流");
                        cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
                    }
                }
                // 可能是设备主动停止
                Device device = storager.queryVideoDeviceByChannelId(platformGbId);
                if (device != null) {
                    StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
                    if (streamInfo != null) {
                        redisCatchStorage.stopPlay(streamInfo);
                    }
                    storager.stopPlay(device.getDeviceId(), channelId);
                    mediaServerService.closeRTPServer(device, channelId);
                }
            }
        } catch (SipException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java
New file
@@ -0,0 +1,40 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
/**
 * @description:CANCEL请求处理器
 * @author: swwheihei
 * @date:   2020年5月3日 下午5:32:23
 */
@Component
public class CancelRequestProcessor extends SIPRequestProcessorAbstract {
    private String method = "CANCEL";
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
    /**
     * 处理CANCEL请求
     *
     * @param evt 事件
     */
    @Override
    public void process(RequestEvent evt) {
        // TODO 优先级99 Cancel Request消息实现,此消息一般为级联消息,上级给下级发送请求取消指令
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
New file
@@ -0,0 +1,386 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Vector;
/**
 * @description:处理INVITE请求
 * @author: panll
 * @date:   2021年1月14日
 */
@SuppressWarnings("rawtypes")
@Component
public class InviteRequestProcessor extends SIPRequestProcessorAbstract {
    private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class);
    private String method = "INVITE";
    @Autowired
    private SIPCommanderFroPlatform cmderFroPlatform;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private IRedisCatchStorage  redisCatchStorage;
    @Autowired
    private SIPCommander cmder;
    @Autowired
    private IPlayService playService;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
    /**
     * 处理invite请求
     *
     * @param evt
     *            请求消息
     */
    @Override
    public void process(RequestEvent evt) {
        //  Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
        try {
            Request request = evt.getRequest();
            SipURI sipURI = (SipURI) request.getRequestURI();
            String channelId = sipURI.getUser();
            String requesterId = null;
            FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
            AddressImpl address = (AddressImpl) fromHeader.getAddress();
            SipUri uri = (SipUri) address.getURI();
            requesterId = uri.getUser();
            if (requesterId == null || channelId == null) {
                logger.info("无法从FromHeader的Address中获取到平台id,返回400");
                responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误
                return;
            }
            // 查询请求方是否上级平台
            ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
            if (platform != null) {
                // 查询平台下是否有该通道
                DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
                GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
                MediaServerItem mediaServerItem = null;
                // 不是通道可能是直播流
                if (channel != null && gbStream == null ) {
                    if (channel.getStatus() == 0) {
                        logger.info("通道离线,返回400");
                        responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline");
                        return;
                    }
                    responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
                }else if(channel == null && gbStream != null){
                    String mediaServerId = gbStream.getMediaServerId();
                    mediaServerItem = mediaServerService.getOne(mediaServerId);
                    if (mediaServerItem == null) {
                        logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
                        responseAck(evt, Response.GONE, "media server not found");
                        return;
                    }
                    Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
                    if (!streamReady ) {
                        logger.info("[ app={}, stream={} ]通道离线,返回400",gbStream.getApp(), gbStream.getStream());
                        responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
                        return;
                    }
                    responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
                }else {
                    logger.info("通道不存在,返回404");
                    responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在
                    return;
                }
                // 解析sdp消息, 使用jainsip 自带的sdp解析方式
                String contentString = new String(request.getRawContent());
                // jainSip不支持y=字段, 移除移除以解析。
                int ssrcIndex = contentString.indexOf("y=");
                //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段
                String ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
                String substring = contentString.substring(0, contentString.indexOf("y="));
                SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
                //  获取支持的格式
                Vector mediaDescriptions = sdp.getMediaDescriptions(true);
                // 查看是否支持PS 负载96
                //String ip = null;
                int port = -1;
                //boolean recvonly = false;
                boolean mediaTransmissionTCP = false;
                Boolean tcpActive = null;
                for (Object description : mediaDescriptions) {
                    MediaDescription mediaDescription = (MediaDescription) description;
                    Media media = mediaDescription.getMedia();
                    Vector mediaFormats = media.getMediaFormats(false);
                    if (mediaFormats.contains("96")) {
                        port = media.getMediaPort();
                        //String mediaType = media.getMediaType();
                        String protocol = media.getProtocol();
                        // 区分TCP发流还是udp, 当前默认udp
                        if ("TCP/RTP/AVP".equals(protocol)) {
                            String setup = mediaDescription.getAttribute("setup");
                            if (setup != null) {
                                mediaTransmissionTCP = true;
                                if ("active".equals(setup)) {
                                    tcpActive = true;
                                } else if ("passive".equals(setup)) {
                                    tcpActive = false;
                                }
                            }
                        }
                        break;
                    }
                }
                if (port == -1) {
                    logger.info("不支持的媒体格式,返回415");
                    // 回复不支持的格式
                    responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
                    return;
                }
                String username = sdp.getOrigin().getUsername();
                String addressStr = sdp.getOrigin().getAddress();
                //String sessionName = sdp.getSessionName().getValue();
                logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc);
                Device device  = null;
                // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
                if (channel != null) {
                    device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
                    if (device == null) {
                        logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
                        responseAck(evt, Response.SERVER_INTERNAL_ERROR);
                        return;
                    }
                    mediaServerItem = playService.getNewMediaServerItem(device);
                    if (mediaServerItem == null) {
                        logger.warn("未找到可用的zlm");
                        responseAck(evt, Response.BUSY_HERE);
                        return;
                    }
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            device.getDeviceId(), channelId,
                            mediaTransmissionTCP);
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
                    }
                    if (sendRtpItem == null) {
                        logger.warn("服务器端口资源不足");
                        responseAck(evt, Response.BUSY_HERE);
                        return;
                    }
                    // 写入redis, 超时时回复
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                    // 通知下级推流,
                    PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{
                        // 收到推流, 回复200OK, 等待ack
                        // if (sendRtpItem == null) return;
                        sendRtpItem.setStatus(1);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        // TODO 添加对tcp的支持
                        StringBuffer content = new StringBuffer(200);
                        content.append("v=0\r\n");
                        content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
                        content.append("s=Play\r\n");
                        content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
                        content.append("t=0 0\r\n");
                        content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
                        content.append("a=sendonly\r\n");
                        content.append("a=rtpmap:96 PS/90000\r\n");
                        content.append("y="+ ssrc + "\r\n");
                        content.append("f=\r\n");
                        try {
                            responseAck(evt, content.toString());
                        } catch (SipException e) {
                            e.printStackTrace();
                        } catch (InvalidArgumentException e) {
                            e.printStackTrace();
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                    } ,((event) -> {
                        // 未知错误。直接转发设备点播的错误
                        Response response = null;
                        try {
                            response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
                            ServerTransaction serverTransaction = getServerTransaction(evt);
                            serverTransaction.sendResponse(response);
                            if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
                        } catch (ParseException | SipException | InvalidArgumentException e) {
                            e.printStackTrace();
                        }
                    }));
                    if (logger.isDebugEnabled()) {
                        logger.debug(playResult.getResult().toString());
                    }
                }else if (gbStream != null) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            gbStream.getApp(), gbStream.getStream(), channelId,
                            mediaTransmissionTCP);
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
                    }
                    if (sendRtpItem == null) {
                        logger.warn("服务器端口资源不足");
                        responseAck(evt, Response.BUSY_HERE);
                        return;
                    }
                    // 写入redis, 超时时回复
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                    sendRtpItem.setStatus(1);
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                    // TODO 添加对tcp的支持
                    StringBuffer content = new StringBuffer(200);
                    content.append("v=0\r\n");
                    content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
                    content.append("s=Play\r\n");
                    content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
                    content.append("t=0 0\r\n");
                    content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
                    content.append("a=sendonly\r\n");
                    content.append("a=rtpmap:96 PS/90000\r\n");
                    content.append("y="+ ssrc + "\r\n");
                    content.append("f=\r\n");
                    try {
                        responseAck(evt, content.toString());
                    } catch (SipException e) {
                        e.printStackTrace();
                    } catch (InvalidArgumentException e) {
                        e.printStackTrace();
                    } catch (ParseException e) {
                        e.printStackTrace();
                    }
                }
            } else {
                // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
                Device device = storager.queryVideoDevice(requesterId);
                if (device != null) {
                    logger.info("收到设备" + requesterId + "的语音广播Invite请求");
                    responseAck(evt, Response.TRYING);
                    String contentString = new String(request.getRawContent());
                    // jainSip不支持y=字段, 移除移除以解析。
                    String substring = contentString;
                    String ssrc = "0000000404";
                    int ssrcIndex = contentString.indexOf("y=");
                    if (ssrcIndex > 0) {
                        substring = contentString.substring(0, ssrcIndex);
                        ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
                    }
                    ssrcIndex = substring.indexOf("f=");
                    if (ssrcIndex > 0) {
                        substring = contentString.substring(0, ssrcIndex);
                    }
                    SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
                    //  获取支持的格式
                    Vector mediaDescriptions = sdp.getMediaDescriptions(true);
                    // 查看是否支持PS 负载96
                    int port = -1;
                    //boolean recvonly = false;
                    boolean mediaTransmissionTCP = false;
                    Boolean tcpActive = null;
                    for (int i = 0; i < mediaDescriptions.size(); i++) {
                        MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i);
                        Media media = mediaDescription.getMedia();
                        Vector mediaFormats = media.getMediaFormats(false);
                        if (mediaFormats.contains("8")) {
                            port = media.getMediaPort();
                            String protocol = media.getProtocol();
                            // 区分TCP发流还是udp, 当前默认udp
                            if ("TCP/RTP/AVP".equals(protocol)) {
                                String setup = mediaDescription.getAttribute("setup");
                                if (setup != null) {
                                    mediaTransmissionTCP = true;
                                    if ("active".equals(setup)) {
                                        tcpActive = true;
                                    } else if ("passive".equals(setup)) {
                                        tcpActive = false;
                                    }
                                }
                            }
                            break;
                        }
                    }
                    if (port == -1) {
                        logger.info("不支持的媒体格式,返回415");
                        // 回复不支持的格式
                        responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
                        return;
                    }
                    String username = sdp.getOrigin().getUsername();
                    String addressStr = sdp.getOrigin().getAddress();
                    logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
                } else {
                    logger.warn("来自无效设备/平台的请求");
                    responseAck(evt, Response.BAD_REQUEST);
                }
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
            logger.warn("sdp解析错误");
            e.printStackTrace();
        } catch (SdpParseException e) {
            e.printStackTrace();
        } catch (SdpException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor.java
New file
@@ -0,0 +1,1103 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.VManageBootstrap;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceAlarmService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.GpsUtil;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.address.SipUri;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
 * @description:MESSAGE请求处理器
 * @author: swwheihei
 * @date: 2020年5月3日 下午5:32:41
 */
@SuppressWarnings(value={"unchecked", "rawtypes"})
@Component
public class MessageRequestProcessor extends SIPRequestProcessorAbstract {
    public static volatile List<String> threadNameList = new ArrayList();
    private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class);
    private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_";
    private static final String MESSAGE_KEEP_ALIVE = "Keepalive";
    private static final String MESSAGE_CONFIG_DOWNLOAD = "ConfigDownload";
    private static final String MESSAGE_CATALOG = "Catalog";
    private static final String MESSAGE_DEVICE_INFO = "DeviceInfo";
    private static final String MESSAGE_ALARM = "Alarm";
    private static final String MESSAGE_RECORD_INFO = "RecordInfo";
    private static final String MESSAGE_MEDIA_STATUS = "MediaStatus";
    private static final String MESSAGE_BROADCAST = "Broadcast";
    private static final String MESSAGE_DEVICE_STATUS = "DeviceStatus";
    private static final String MESSAGE_DEVICE_CONTROL = "DeviceControl";
    private static final String MESSAGE_DEVICE_CONFIG = "DeviceConfig";
    private static final String MESSAGE_MOBILE_POSITION = "MobilePosition";
    private static final String MESSAGE_PRESET_QUERY = "PresetQuery";
    private String method = "MESSAGE";
    @Autowired
    private UserSetup userSetup;
    @Autowired
    private SIPCommander cmder;
    @Autowired
    private SipConfig config;
    @Autowired
    private SIPCommanderFroPlatform cmderFroPlatform;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private EventPublisher publisher;
    @Autowired
    private RedisUtil redis;
    @Autowired
    private DeferredResultHolder deferredResultHolder;
    @Autowired
    private DeviceOffLineDetector offLineDetector;
    @Autowired
    private IDeviceAlarmService deviceAlarmService;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
    /**
     * 处理MESSAGE请求
     *
     * @param evt
     */
    @Override
    public void process(RequestEvent evt) {
        try {
            Element rootElement = getRootElement(evt);
            String cmd = getText(rootElement, "CmdType");
            if (MESSAGE_KEEP_ALIVE.equals(cmd)) {
                logger.debug("接收到KeepAlive消息");
                processMessageKeepAlive(evt);
            } else if (MESSAGE_CONFIG_DOWNLOAD.equals(cmd)) {
                logger.debug("接收到ConfigDownload消息");
                processMessageConfigDownload(evt);
            } else if (MESSAGE_CATALOG.equals(cmd)) {
                logger.debug("接收到Catalog消息");
                processMessageCatalogList(evt);
            } else if (MESSAGE_DEVICE_INFO.equals(cmd)) {
                // DeviceInfo消息处理
                processMessageDeviceInfo(evt);
            } else if (MESSAGE_DEVICE_STATUS.equals(cmd)) {
                // DeviceStatus消息处理
                processMessageDeviceStatus(evt);
            } else if (MESSAGE_DEVICE_CONTROL.equals(cmd)) {
                logger.debug("接收到DeviceControl消息");
                processMessageDeviceControl(evt);
            } else if (MESSAGE_DEVICE_CONFIG.equals(cmd)) {
                logger.info("接收到DeviceConfig消息");
                processMessageDeviceConfig(evt);
            } else if (MESSAGE_ALARM.equals(cmd)) {
                logger.debug("接收到Alarm消息");
                processMessageAlarm(evt);
            } else if (MESSAGE_RECORD_INFO.equals(cmd)) {
                logger.debug("接收到RecordInfo消息");
                processMessageRecordInfo(evt);
            }else if (MESSAGE_MEDIA_STATUS.equals(cmd)) {
                logger.debug("接收到MediaStatus消息");
                processMessageMediaStatus(evt);
            } else if (MESSAGE_MOBILE_POSITION.equals(cmd)) {
                logger.debug("接收到MobilePosition消息");
                processMessageMobilePosition(evt);
            } else if (MESSAGE_PRESET_QUERY.equals(cmd)) {
                logger.debug("接收到PresetQuery消息");
                processMessagePresetQuery(evt);
            } else if (MESSAGE_BROADCAST.equals(cmd)) {
                // Broadcast消息处理
                processMessageBroadcast(evt);
            } else {
                logger.debug("接收到消息:" + cmd);
                responseAck(evt, Response.OK);
            }
        } catch (DocumentException | SipException |InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理MobilePosition移动位置消息
     *
     * @param evt
     */
    private void processMessageMobilePosition(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理MobilePosition移动位置消息时未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            Element rootElement = getRootElement(evt, device.getCharset());
            MobilePosition mobilePosition = new MobilePosition();
            if (!StringUtils.isEmpty(device.getName())) {
                mobilePosition.setDeviceName(device.getName());
            }
            mobilePosition.setDeviceId(deviceId);
            mobilePosition.setChannelId(getText(rootElement, "DeviceID"));
            mobilePosition.setTime(getText(rootElement, "Time"));
            mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude")));
            mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude")));
            if (NumericUtil.isDouble(getText(rootElement, "Speed"))) {
                mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed")));
            } else {
                mobilePosition.setSpeed(0.0);
            }
            if (NumericUtil.isDouble(getText(rootElement, "Direction"))) {
                mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction")));
            } else {
                mobilePosition.setDirection(0.0);
            }
            if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) {
                mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude")));
            } else {
                mobilePosition.setAltitude(0.0);
            }
            mobilePosition.setReportSource("Mobile Position");
            BaiduPoint bp = new BaiduPoint();
            bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
            logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat());
            mobilePosition.setGeodeticSystem("BD-09");
            mobilePosition.setCnLng(bp.getBdLng());
            mobilePosition.setCnLat(bp.getBdLat());
            if (!userSetup.getSavePositionHistory()) {
                storager.clearMobilePositionsByDeviceId(deviceId);
            }
            storager.insertMobilePosition(mobilePosition);
            //回复 200 OK
            responseAck(evt, Response.OK);
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理DeviceStatus设备状态Message
     *
     * @param evt
     */
    private void processMessageDeviceStatus(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理DeviceStatus设备状态Message时未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            Element rootElement = getRootElement(evt);
            String name = rootElement.getName();
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getText();
            if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应,还是Query——查询请求
                logger.info("接收到DeviceStatus查询消息");
                FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                String platformId = ((SipUri) fromHeader.getAddress().getURI()).getUser();
                    if (platformId == null) {
                    responseAck(evt, Response.NOT_FOUND);
                    return;
                } else {
                    // 回复200 OK
                    responseAck(evt, Response.OK);
                    String sn = rootElement.element("SN").getText();
                    ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
                    cmderFroPlatform.deviceStatusResponse(parentPlatform, sn, fromHeader.getTag());
                }
            } else {
                logger.info("接收到DeviceStatus应答消息");
                // 检查设备是否存在, 不存在则不回复
                if (storager.exists(deviceId)) {
                    // 回复200 OK
                    responseAck(evt, Response.OK);
                    JSONObject json = new JSONObject();
                    XmlUtil.node2Json(rootElement, json);
                    if (logger.isDebugEnabled()) {
                        logger.debug(json.toJSONString());
                    }
                    RequestMessage msg = new RequestMessage();
                    msg.setKey(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + deviceId + channelId);
                    msg.setData(json);
                    deferredResultHolder.invokeAllResult(msg);
                    if (offLineDetector.isOnline(deviceId)) {
                        publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
                    } else {
                    }
                }
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理DeviceControl设备状态Message
     *
     * @param evt
     */
    private void processMessageDeviceControl(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理DeviceControl设备状态Message未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            Element rootElement = getRootElement(evt);
            String channelId = getText(rootElement, "DeviceID");
            //String result = getText(rootElement, "Result");
            // 回复200 OK
            responseAck(evt, Response.OK);
            if (rootElement.getName().equals("Response")) {//} !StringUtils.isEmpty(result)) {
                // 此处是对本平台发出DeviceControl指令的应答
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
                if (logger.isDebugEnabled()) {
                    logger.debug(json.toJSONString());
                }
                RequestMessage msg = new RequestMessage();
                String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL +  deviceId + channelId;
                msg.setKey(key);
                msg.setData(json);
                deferredResultHolder.invokeAllResult(msg);
            } else {
                // 此处是上级发出的DeviceControl指令
                String platformId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
                String targetGBId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
                // 远程启动功能
                if (!StringUtils.isEmpty(getText(rootElement, "TeleBoot"))) {
                    if (deviceId.equals(targetGBId)) {
                        // 远程启动本平台:需要在重新启动程序后先对SipStack解绑
                        logger.info("执行远程启动本平台命令");
                        ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
                        cmderFroPlatform.unregister(parentPlatform, null, null);
                        Thread restartThread = new Thread(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    Thread.sleep(3000);
                                    SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider");
                                    SipStackImpl stack = (SipStackImpl)up.getSipStack();
                                    stack.stop();
                                    Iterator listener = stack.getListeningPoints();
                                    while (listener.hasNext()) {
                                        stack.deleteListeningPoint((ListeningPoint) listener.next());
                                    }
                                    Iterator providers = stack.getSipProviders();
                                    while (providers.hasNext()) {
                                        stack.deleteSipProvider((SipProvider) providers.next());
                                    }
                                    VManageBootstrap.restart();
                                } catch (InterruptedException ignored) {
                                } catch (ObjectInUseException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                        restartThread.setDaemon(false);
                        restartThread.start();
                    } else {
                        // 远程启动指定设备
                    }
                }
                // 云台/前端控制命令
                if (!StringUtils.isEmpty(getText(rootElement,"PTZCmd")) && !deviceId.equals(targetGBId)) {
                    String cmdString = getText(rootElement,"PTZCmd");
                    Device deviceForPlatform = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, deviceId);
                    cmder.fronEndCmd(deviceForPlatform, deviceId, cmdString);
                }
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理DeviceConfig设备状态Message
     *
     * @param evt
     */
    private void processMessageDeviceConfig(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理DeviceConfig设备状态Message消息时未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            Element rootElement = getRootElement(evt);
            String channelId = getText(rootElement, "DeviceID");
            // 回复200 OK
            responseAck(evt, Response.OK);
            if (rootElement.getName().equals("Response")) {
                    // 此处是对本平台发出DeviceControl指令的应答
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
                if (logger.isDebugEnabled()) {
                    logger.debug(json.toJSONString());
                }
                String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG + deviceId + channelId;
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                msg.setData(json);
                deferredResultHolder.invokeAllResult(msg);
            } else {
                // 此处是上级发出的DeviceConfig指令
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理ConfigDownload设备状态Message
     *
     * @param evt
     */
    private void processMessageConfigDownload(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理ConfigDownload设备状态Message时未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            Element rootElement = getRootElement(evt);
            String channelId = getText(rootElement, "DeviceID");
            String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + deviceId + channelId;
            // 回复200 OK
            responseAck(evt, Response.OK);
            if (rootElement.getName().equals("Response")) {
                    // 此处是对本平台发出DeviceControl指令的应答
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
                if (logger.isDebugEnabled()) {
                    logger.debug(json.toJSONString());
                }
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                msg.setData(json);
                deferredResultHolder.invokeAllResult(msg);
            } else {
                // 此处是上级发出的DeviceConfig指令
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理PresetQuery预置位列表Message
     *
     * @param evt
     */
    private void processMessagePresetQuery(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理PresetQuery预置位列表Message时未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            Element rootElement = getRootElement(evt);
            String channelId = getText(rootElement, "DeviceID");
            String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId;
            // 回复200 OK
            responseAck(evt, Response.OK);
            if (rootElement.getName().equals("Response")) {//   !StringUtils.isEmpty(result)) {
                // 此处是对本平台发出DeviceControl指令的应答
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
                if (logger.isDebugEnabled()) {
                    logger.debug(json.toJSONString());
                }
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                msg.setData(json);
                deferredResultHolder.invokeAllResult(msg);
            } else {
                // 此处是上级发出的DeviceControl指令
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理DeviceInfo设备信息Message
     *
     * @param evt
     */
    private void processMessageDeviceInfo(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(deviceId);
            Element rootElement = getRootElement(evt);
            String requestName = rootElement.getName();
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getTextTrim();
            String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + deviceId + channelId;
            if (device != null ) {
                rootElement = getRootElement(evt, device.getCharset());
            }
            if (requestName.equals("Query")) {
                logger.info("接收到DeviceInfo查询消息");
                FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                if (parentPlatform == null) {
                    responseAck(evt, Response.NOT_FOUND);
                    return;
                } else {
                    // 回复200 OK
                    responseAck(evt, Response.OK);
                    String sn = rootElement.element("SN").getText();
                    cmderFroPlatform.deviceInfoResponse(parentPlatform, sn, fromHeader.getTag());
                }
            } else {
                logger.debug("接收到DeviceInfo应答消息");
                if (device == null) {
                    logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
                    responseAck(evt, Response.NOT_FOUND);
                    return;
                }
                device.setName(getText(rootElement, "DeviceName"));
                device.setManufacturer(getText(rootElement, "Manufacturer"));
                device.setModel(getText(rootElement, "Model"));
                device.setFirmware(getText(rootElement, "Firmware"));
                if (StringUtils.isEmpty(device.getStreamMode())) {
                    device.setStreamMode("UDP");
                }
                storager.updateDevice(device);
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                msg.setData(device);
                deferredResultHolder.invokeAllResult(msg);
                // 回复200 OK
                responseAck(evt, Response.OK);
                if (offLineDetector.isOnline(deviceId)) {
                    publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
                }
            }
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /***
     * 收到catalog设备目录列表请求 处理
     *
     * @param evt
     */
    private void processMessageCatalogList(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(deviceId);
            Element rootElement = getRootElement(evt);
            String name = rootElement.getName();
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getText();
            Element deviceListElement = rootElement.element("DeviceList");
            String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应,还是Query——查询请求
                // TODO 后续将代码拆分
                if (parentPlatform == null) {
                    responseAck(evt, Response.NOT_FOUND);
                    return;
                } else {
                    // 回复200 OK
                    responseAck(evt, Response.OK);
                    Element snElement = rootElement.element("SN");
                    String sn = snElement.getText();
                    // 准备回复通道信息
                    List<ChannelReduce> channelReduces = storager.queryChannelListInParentPlatform(parentPlatform.getServerGBId());
                    // 查询关联的直播通道
                    List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
                    int size = channelReduces.size() + gbStreams.size();
                    // 回复级联的通道
                    if (channelReduces.size() > 0) {
                        for (ChannelReduce channelReduce : channelReduces) {
                            DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
                            cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
                        }
                    }
                    // 回复直播的通道
                    if (gbStreams.size() > 0) {
                        for (GbStream gbStream : gbStreams) {
                            DeviceChannel deviceChannel = new DeviceChannel();
                            deviceChannel.setChannelId(gbStream.getGbId());
                            deviceChannel.setName(gbStream.getName());
                            deviceChannel.setLongitude(gbStream.getLongitude());
                            deviceChannel.setLatitude(gbStream.getLatitude());
                            deviceChannel.setDeviceId(parentPlatform.getDeviceGBId());
                            deviceChannel.setManufacture("wvp-pro");
                            deviceChannel.setStatus(gbStream.isStatus()?1:0);
//                            deviceChannel.setParentId(parentPlatform.getDeviceGBId());
                            deviceChannel.setRegisterWay(1);
                            deviceChannel.setCivilCode(config.getDomain());
                            deviceChannel.setModel("live");
                            deviceChannel.setOwner("wvp-pro");
//                            deviceChannel.setAddress("test");
                            deviceChannel.setParental(0);
                            deviceChannel.setSecrecy("0");
                            deviceChannel.setSecrecy("0");
                            cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
                        }
                    }
                    if (size == 0) {
                        // 回复无通道
                        cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size);
                    }
                }
            } else {
                if (device == null) {
                    logger.warn("收到catalog设备目录列表请求时未找到设备信息");
                    responseAck(evt, Response.NOT_FOUND);
                    return;
                }
                deviceListElement = getRootElement(evt, device.getCharset()).element("DeviceList");
                Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                if (deviceListIterator != null) {
                    // 遍历DeviceList
                    while (deviceListIterator.hasNext()) {
                        Element itemDevice = deviceListIterator.next();
                        Element channelDeviceElement = itemDevice.element("DeviceID");
                        if (channelDeviceElement == null) {
                            continue;
                        }
                        String channelDeviceId = channelDeviceElement.getText();
                        Element channdelNameElement = itemDevice.element("Name");
                        String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : "";
                        Element statusElement = itemDevice.element("Status");
                        String status = statusElement != null ? statusElement.getText().toString() : "ON";
                        DeviceChannel deviceChannel = new DeviceChannel();
                        deviceChannel.setName(channelName);
                        deviceChannel.setChannelId(channelDeviceId);
                        // ONLINE OFFLINE  HIKVISION DS-7716N-E4 NVR的兼容性处理
                        if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) {
                            deviceChannel.setStatus(1);
                        }
                        if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
                            deviceChannel.setStatus(0);
                        }
                        deviceChannel.setManufacture(getText(itemDevice, "Manufacturer"));
                        deviceChannel.setModel(getText(itemDevice, "Model"));
                        deviceChannel.setOwner(getText(itemDevice, "Owner"));
                        deviceChannel.setCivilCode(getText(itemDevice, "CivilCode"));
                        deviceChannel.setBlock(getText(itemDevice, "Block"));
                        deviceChannel.setAddress(getText(itemDevice, "Address"));
                        if (getText(itemDevice, "Parental") == null || getText(itemDevice, "Parental") == "") {
                            deviceChannel.setParental(0);
                        } else {
                            deviceChannel.setParental(Integer.parseInt(getText(itemDevice, "Parental")));
                        }
                        deviceChannel.setParentId(getText(itemDevice, "ParentID"));
                        if (getText(itemDevice, "SafetyWay") == null || getText(itemDevice, "SafetyWay") == "") {
                            deviceChannel.setSafetyWay(0);
                        } else {
                            deviceChannel.setSafetyWay(Integer.parseInt(getText(itemDevice, "SafetyWay")));
                        }
                        if (getText(itemDevice, "RegisterWay") == null || getText(itemDevice, "RegisterWay") == "") {
                            deviceChannel.setRegisterWay(1);
                        } else {
                            deviceChannel.setRegisterWay(Integer.parseInt(getText(itemDevice, "RegisterWay")));
                        }
                        deviceChannel.setCertNum(getText(itemDevice, "CertNum"));
                        if (getText(itemDevice, "Certifiable") == null || getText(itemDevice, "Certifiable") == "") {
                            deviceChannel.setCertifiable(0);
                        } else {
                            deviceChannel.setCertifiable(Integer.parseInt(getText(itemDevice, "Certifiable")));
                        }
                        if (getText(itemDevice, "ErrCode") == null || getText(itemDevice, "ErrCode") == "") {
                            deviceChannel.setErrCode(0);
                        } else {
                            deviceChannel.setErrCode(Integer.parseInt(getText(itemDevice, "ErrCode")));
                        }
                        deviceChannel.setEndTime(getText(itemDevice, "EndTime"));
                        deviceChannel.setSecrecy(getText(itemDevice, "Secrecy"));
                        deviceChannel.setIpAddress(getText(itemDevice, "IPAddress"));
                        if (getText(itemDevice, "Port") == null || getText(itemDevice, "Port") == "") {
                            deviceChannel.setPort(0);
                        } else {
                            deviceChannel.setPort(Integer.parseInt(getText(itemDevice, "Port")));
                        }
                        deviceChannel.setPassword(getText(itemDevice, "Password"));
                        if (NumericUtil.isDouble(getText(itemDevice, "Longitude"))) {
                            deviceChannel.setLongitude(Double.parseDouble(getText(itemDevice, "Longitude")));
                        } else {
                            deviceChannel.setLongitude(0.00);
                        }
                        if (NumericUtil.isDouble(getText(itemDevice, "Latitude"))) {
                            deviceChannel.setLatitude(Double.parseDouble(getText(itemDevice, "Latitude")));
                        } else {
                            deviceChannel.setLatitude(0.00);
                        }
                        if (getText(itemDevice, "PTZType") == null || getText(itemDevice, "PTZType") == "") {
                            deviceChannel.setPTZType(0);
                        } else {
                            deviceChannel.setPTZType(Integer.parseInt(getText(itemDevice, "PTZType")));
                        }
                        deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC
                        storager.updateChannel(device.getDeviceId(), deviceChannel);
                    }
                    RequestMessage msg = new RequestMessage();
                    msg.setKey(key);
                    msg.setData(device);
                    deferredResultHolder.invokeAllResult(msg);
                    // 回复200 OK
                    responseAck(evt, Response.OK);
                    if (offLineDetector.isOnline(deviceId)) {
                        publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
                    }
                }
            }
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /***
     * 收到alarm设备报警信息 处理
     *
     * @param evt
     */
    private void processMessageAlarm(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理alarm设备报警信息未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            Element rootElement = getRootElement(evt, device.getCharset());
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getText().toString();
            String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId + channelId;
            // 回复200 OK
            responseAck(evt, Response.OK);
            if (device.getCharset() != null) {
                rootElement = getRootElement(evt, device.getCharset());
            }
            if (rootElement.getName().equals("Notify")) {    // 处理报警通知
                DeviceAlarm deviceAlarm = new DeviceAlarm();
                deviceAlarm.setDeviceId(deviceId);
                deviceAlarm.setChannelId(channelId);
                deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority"));
                deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod"));
                deviceAlarm.setAlarmTime(getText(rootElement, "AlarmTime"));
                if (getText(rootElement, "AlarmDescription") == null) {
                    deviceAlarm.setAlarmDescription("");
                } else {
                    deviceAlarm.setAlarmDescription(getText(rootElement, "AlarmDescription"));
                }
                if (NumericUtil.isDouble(getText(rootElement, "Longitude"))) {
                    deviceAlarm.setLongitude(Double.parseDouble(getText(rootElement, "Longitude")));
                } else {
                    deviceAlarm.setLongitude(0.00);
                }
                if (NumericUtil.isDouble(getText(rootElement, "Latitude"))) {
                    deviceAlarm.setLatitude(Double.parseDouble(getText(rootElement, "Latitude")));
                } else {
                    deviceAlarm.setLatitude(0.00);
                }
                if (!StringUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
                    if ( deviceAlarm.getAlarmMethod().equals("4")) {
                        MobilePosition mobilePosition = new MobilePosition();
                        mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
                        mobilePosition.setTime(deviceAlarm.getAlarmTime());
                        mobilePosition.setLongitude(deviceAlarm.getLongitude());
                        mobilePosition.setLatitude(deviceAlarm.getLatitude());
                        mobilePosition.setReportSource("GPS Alarm");
                        BaiduPoint bp = new BaiduPoint();
                        bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
                        logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat());
                        mobilePosition.setGeodeticSystem("BD-09");
                        mobilePosition.setCnLng(bp.getBdLng());
                        mobilePosition.setCnLat(bp.getBdLat());
                        if (!userSetup.getSavePositionHistory()) {
                            storager.clearMobilePositionsByDeviceId(deviceId);
                        }
                        storager.insertMobilePosition(mobilePosition);
                    }
                }
                logger.debug("存储报警信息、报警分类");
                // 存储报警信息、报警分类
                deviceAlarmService.add(deviceAlarm);
                if (offLineDetector.isOnline(deviceId)) {
                    publisher.deviceAlarmEventPublish(deviceAlarm);
                }
            } else if (rootElement.getName().equals("Response")) {    // 处理报警查询响应
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
                if (logger.isDebugEnabled()) {
                    logger.debug(json.toJSONString());
                }
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                msg.setData(json);
                deferredResultHolder.invokeAllResult(msg);
            }
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /***
     * 收到keepalive请求 处理
     *
     * @param evt
     */
    private void processMessageKeepAlive(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            Element rootElement = getRootElement(evt);
            String channelId = getText(rootElement, "DeviceID");
            // 检查设备是否存在并在线, 不在线则设置为在线
            if (device != null ) {
                // 回复200 OK
                responseAck(evt, Response.OK);
                publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
            }else{
                logger.warn("收到[ "+deviceId+" ]心跳信息, 但是设备不存在, 回复404");
                Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest());
                ServerTransaction serverTransaction = getServerTransaction(evt);
                serverTransaction.sendResponse(response);
                if (serverTransaction.getDialog() != null) {
                    serverTransaction.getDialog().delete();
                }
            }
//            if (device != null && device.getOnline() == 1) {
//
//                if (offLineDetector.isOnline(deviceId)) {
//                    publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
//                } else {
//                }
//            }else {
////                logger.warn("收到[ "+deviceId+" ]心跳信息, 但是设备" + (device == null? "不存在":"离线") + ", 回复401");
////                Response response = getMessageFactory().createResponse(Response.UNAUTHORIZED, evt.getRequest());
////                getServerTransaction(evt).sendResponse(response);
//                publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
//
//            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
    }
    /***
     * 处理RecordInfo设备录像列表Message请求 TODO 过期时间暂时写死180秒,后续与DeferredResult超时时间保持一致
     *
     * @param evt
     */
    private void processMessageRecordInfo(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            // 回复200 OK
            responseAck(evt, Response.OK);
            String uuid = UUID.randomUUID().toString().replace("-", "");
            RecordInfo recordInfo = new RecordInfo();
            Element rootElement = getRootElement(evt);
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getText().toString();
            String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + channelId;
            if (device != null ) {
                rootElement = getRootElement(evt, device.getCharset());
            }
            recordInfo.setDeviceId(deviceId);
            recordInfo.setChannelId(channelId);
            recordInfo.setName(getText(rootElement, "Name"));
            if (getText(rootElement, "SumNum")== null || getText(rootElement, "SumNum") =="") {
                recordInfo.setSumNum(0);
            } else {
                recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum")));
            }
            String sn = getText(rootElement, "SN");
            Element recordListElement = rootElement.element("RecordList");
            if (recordListElement == null || recordInfo.getSumNum() == 0) {
                logger.info("无录像数据");
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                msg.setData(recordInfo);
                deferredResultHolder.invokeAllResult(msg);
            } else {
                Iterator<Element> recordListIterator = recordListElement.elementIterator();
                List<RecordItem> recordList = new ArrayList<RecordItem>();
                if (recordListIterator != null) {
                    RecordItem record = new RecordItem();
                    logger.info("处理录像列表数据...");
                    // 遍历DeviceList
                    while (recordListIterator.hasNext()) {
                        Element itemRecord = recordListIterator.next();
                        Element recordElement = itemRecord.element("DeviceID");
                        if (recordElement == null) {
                            logger.info("记录为空,下一个...");
                            continue;
                        }
                        record = new RecordItem();
                        record.setDeviceId(getText(itemRecord, "DeviceID"));
                        record.setName(getText(itemRecord, "Name"));
                        record.setFilePath(getText(itemRecord, "FilePath"));
                        record.setAddress(getText(itemRecord, "Address"));
                        record.setStartTime(
                                DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "StartTime")));
                        record.setEndTime(
                                DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "EndTime")));
                        record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
                                : Integer.parseInt(getText(itemRecord, "Secrecy")));
                        record.setType(getText(itemRecord, "Type"));
                        record.setRecorderId(getText(itemRecord, "RecorderID"));
                        recordList.add(record);
                    }
                    recordInfo.setRecordList(recordList);
                }
                // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题
                String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
                redis.set(cacheKey + "_" + uuid, recordList, 90);
                if (!threadNameList.contains(cacheKey)) {
                    threadNameList.add(cacheKey);
                    CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo);
                    chk.setName(cacheKey);
                    chk.setDeferredResultHolder(deferredResultHolder);
                    chk.setRedis(redis);
                    chk.setLogger(logger);
                    chk.start();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Start Thread " + cacheKey + ".");
                    }
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Thread " + cacheKey + " already started.");
                    }
                }
                // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回
                // if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) {
                //     // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分
                //     String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
                //     redis.set(cacheKey + "_" + uuid, recordList, 90);
                //     List<Object> cacheKeys = redis.scan(cacheKey + "_*");
                //     List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
                //     for (int i = 0; i < cacheKeys.size(); i++) {
                //         totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
                //     }
                //     if (totalRecordList.size() < recordInfo.getSumNum()) {
                //         logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项");
                //         return;
                //     }
                //     logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项");
                //     recordInfo.setRecordList(totalRecordList);
                //     for (int i = 0; i < cacheKeys.size(); i++) {
                //         redis.del(cacheKeys.get(i).toString());
                //     }
                // }
                // // 自然顺序排序, 元素进行升序排列
                // recordInfo.getRecordList().sort(Comparator.naturalOrder());
            }
            // 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作
            // 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作
            // 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据
            // RequestMessage msg = new RequestMessage();
            // msg.setDeviceId(deviceId);
            // msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
            // msg.setData(recordInfo);
            // deferredResultHolder.invokeResult(msg);
            // logger.info("处理完成,返回结果");
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /**
     * 收到MediaStatus消息处理
      *
      * @param evt
      */
    private void processMessageMediaStatus(RequestEvent evt){
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            // 回复200 OK
            responseAck(evt, Response.OK);
            Element rootElement = getRootElement(evt);
            String channelId = getText(rootElement, "DeviceID");
            String NotifyType =getText(rootElement, "NotifyType");
            if (NotifyType.equals("121")){
                logger.info("媒体播放完毕,通知关流");
                StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*");
                if (streamInfo != null) {
                    redisCatchStorage.stopPlayback(streamInfo);
                    cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
                }
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理AudioBroadcast语音广播Message
     *
     * @param evt
     */
    private void processMessageBroadcast(RequestEvent evt) {
        try {
            String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
            // 查询设备是否存在
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
                responseAck(evt, Response.NOT_FOUND);
                return;
            }
            Element rootElement = getRootElement(evt);
            String channelId = getText(rootElement, "DeviceID");
            String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId + channelId;
            // 回复200 OK
            responseAck(evt, Response.OK);
            if (rootElement.getName().equals("Response")) {
                    // 此处是对本平台发出Broadcast指令的应答
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
                if (logger.isDebugEnabled()) {
                    logger.debug(json.toJSONString());
                }
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                msg.setData(json);
                deferredResultHolder.invokeAllResult(msg);
            } else {
                // 此处是上级发出的Broadcast指令
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
New file
@@ -0,0 +1,384 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.GpsUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Iterator;
/**
 * @description: Notify请求处理器
 * @author: lawrencehj
 * @date: 2021年1月27日
 */
@Component
public class NotifyRequestProcessor extends SIPRequestProcessorAbstract {
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class);
    @Autowired
    private UserSetup userSetup;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private EventPublisher publisher;
    @Autowired
    private DeviceOffLineDetector offLineDetector;
    private static final String NOTIFY_CATALOG = "Catalog";
    private static final String NOTIFY_ALARM = "Alarm";
    private static final String NOTIFY_MOBILE_POSITION = "MobilePosition";
    private String method = "NOTIFY";
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
    @Override
    public void process(RequestEvent evt) {
        try {
            Element rootElement = getRootElement(evt);
            String cmd = XmlUtil.getText(rootElement, "CmdType");
            if (NOTIFY_CATALOG.equals(cmd)) {
                logger.info("接收到Catalog通知");
                processNotifyCatalogList(evt);
            } else if (NOTIFY_ALARM.equals(cmd)) {
                logger.info("接收到Alarm通知");
                processNotifyAlarm(evt);
            } else if (NOTIFY_MOBILE_POSITION.equals(cmd)) {
                logger.info("接收到MobilePosition通知");
                processNotifyMobilePosition(evt);
            } else {
                logger.info("接收到消息:" + cmd);
                responseAck(evt, Response.OK);
            }
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /**
     * 处理MobilePosition移动位置Notify
     *
     * @param evt
     */
    private void processNotifyMobilePosition(RequestEvent evt) {
        try {
            // 回复 200 OK
            Element rootElement = getRootElement(evt);
            MobilePosition mobilePosition = new MobilePosition();
            Element deviceIdElement = rootElement.element("DeviceID");
            String deviceId = deviceIdElement.getTextTrim().toString();
            Device device = storager.queryVideoDevice(deviceId);
            if (device != null) {
                if (!StringUtils.isEmpty(device.getName())) {
                    mobilePosition.setDeviceName(device.getName());
                }
            }
            mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID"));
            mobilePosition.setTime(XmlUtil.getText(rootElement, "Time"));
            mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
            mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
                mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
            } else {
                mobilePosition.setSpeed(0.0);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
                mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
            } else {
                mobilePosition.setDirection(0.0);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
                mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
            } else {
                mobilePosition.setAltitude(0.0);
            }
            mobilePosition.setReportSource("Mobile Position");
            BaiduPoint bp = new BaiduPoint();
            bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
            logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat());
            mobilePosition.setGeodeticSystem("BD-09");
            mobilePosition.setCnLng(bp.getBdLng());
            mobilePosition.setCnLat(bp.getBdLat());
            if (!userSetup.getSavePositionHistory()) {
                storager.clearMobilePositionsByDeviceId(deviceId);
            }
            storager.insertMobilePosition(mobilePosition);
            responseAck(evt, Response.OK);
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /***
     * 处理alarm设备报警Notify
     *
     * @param evt
     */
    private void processNotifyAlarm(RequestEvent evt) {
        try {
            Element rootElement = getRootElement(evt);
            Element deviceIdElement = rootElement.element("DeviceID");
            String deviceId = deviceIdElement.getText().toString();
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                return;
            }
            rootElement = getRootElement(evt, device.getCharset());
            DeviceAlarm deviceAlarm = new DeviceAlarm();
            deviceAlarm.setDeviceId(deviceId);
            deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
            deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
            deviceAlarm.setAlarmTime(XmlUtil.getText(rootElement, "AlarmTime"));
            if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
                deviceAlarm.setAlarmDescription("");
            } else {
                deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
                deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
            } else {
                deviceAlarm.setLongitude(0.00);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
                deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
            } else {
                deviceAlarm.setLatitude(0.00);
            }
            if (deviceAlarm.getAlarmMethod().equals("4")) {
                MobilePosition mobilePosition = new MobilePosition();
                mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
                mobilePosition.setTime(deviceAlarm.getAlarmTime());
                mobilePosition.setLongitude(deviceAlarm.getLongitude());
                mobilePosition.setLatitude(deviceAlarm.getLatitude());
                mobilePosition.setReportSource("GPS Alarm");
                BaiduPoint bp = new BaiduPoint();
                bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
                logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat());
                mobilePosition.setGeodeticSystem("BD-09");
                mobilePosition.setCnLng(bp.getBdLng());
                mobilePosition.setCnLat(bp.getBdLat());
                if (!userSetup.getSavePositionHistory()) {
                    storager.clearMobilePositionsByDeviceId(deviceId);
                }
                storager.insertMobilePosition(mobilePosition);
            }
            // TODO: 需要实现存储报警信息、报警分类
            // 回复200 OK
            responseAck(evt, Response.OK);
            if (offLineDetector.isOnline(deviceId)) {
                publisher.deviceAlarmEventPublish(deviceAlarm);
            }
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    /***
     * 处理catalog设备目录列表Notify
     *
     * @param evt
     */
    private void processNotifyCatalogList(RequestEvent evt) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            Element rootElement = getRootElement(evt);
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getText();
            Device device = storager.queryVideoDevice(deviceId);
            if (device == null) {
                return;
            }
            if (device != null ) {
                rootElement = getRootElement(evt, device.getCharset());
            }
            Element deviceListElement = rootElement.element("DeviceList");
            if (deviceListElement == null) {
                return;
            }
            Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
            if (deviceListIterator != null) {
                // 遍历DeviceList
                while (deviceListIterator.hasNext()) {
                    Element itemDevice = deviceListIterator.next();
                    Element channelDeviceElement = itemDevice.element("DeviceID");
                    if (channelDeviceElement == null) {
                        continue;
                    }
                    String channelDeviceId = channelDeviceElement.getTextTrim();
                    Element channdelNameElement = itemDevice.element("Name");
                    String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : "";
                    Element statusElement = itemDevice.element("Status");
                    String status = statusElement != null ? statusElement.getTextTrim().toString() : "ON";
                    DeviceChannel deviceChannel = new DeviceChannel();
                    deviceChannel.setName(channelName);
                    deviceChannel.setChannelId(channelDeviceId);
                    // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理
                    if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) {
                        deviceChannel.setStatus(1);
                    }
                    if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
                        deviceChannel.setStatus(0);
                    }
                    deviceChannel.setManufacture(XmlUtil.getText(itemDevice, "Manufacturer"));
                    deviceChannel.setModel(XmlUtil.getText(itemDevice, "Model"));
                    deviceChannel.setOwner(XmlUtil.getText(itemDevice, "Owner"));
                    deviceChannel.setCivilCode(XmlUtil.getText(itemDevice, "CivilCode"));
                    deviceChannel.setBlock(XmlUtil.getText(itemDevice, "Block"));
                    deviceChannel.setAddress(XmlUtil.getText(itemDevice, "Address"));
                    if (XmlUtil.getText(itemDevice, "Parental") == null
                            || XmlUtil.getText(itemDevice, "Parental") == "") {
                        deviceChannel.setParental(0);
                    } else {
                        deviceChannel.setParental(Integer.parseInt(XmlUtil.getText(itemDevice, "Parental")));
                    }
                    deviceChannel.setParentId(XmlUtil.getText(itemDevice, "ParentID"));
                    if (XmlUtil.getText(itemDevice, "SafetyWay") == null
                            || XmlUtil.getText(itemDevice, "SafetyWay") == "") {
                        deviceChannel.setSafetyWay(0);
                    } else {
                        deviceChannel.setSafetyWay(Integer.parseInt(XmlUtil.getText(itemDevice, "SafetyWay")));
                    }
                    if (XmlUtil.getText(itemDevice, "RegisterWay") == null
                            || XmlUtil.getText(itemDevice, "RegisterWay") == "") {
                        deviceChannel.setRegisterWay(1);
                    } else {
                        deviceChannel.setRegisterWay(Integer.parseInt(XmlUtil.getText(itemDevice, "RegisterWay")));
                    }
                    deviceChannel.setCertNum(XmlUtil.getText(itemDevice, "CertNum"));
                    if (XmlUtil.getText(itemDevice, "Certifiable") == null
                            || XmlUtil.getText(itemDevice, "Certifiable") == "") {
                        deviceChannel.setCertifiable(0);
                    } else {
                        deviceChannel.setCertifiable(Integer.parseInt(XmlUtil.getText(itemDevice, "Certifiable")));
                    }
                    if (XmlUtil.getText(itemDevice, "ErrCode") == null
                            || XmlUtil.getText(itemDevice, "ErrCode") == "") {
                        deviceChannel.setErrCode(0);
                    } else {
                        deviceChannel.setErrCode(Integer.parseInt(XmlUtil.getText(itemDevice, "ErrCode")));
                    }
                    deviceChannel.setEndTime(XmlUtil.getText(itemDevice, "EndTime"));
                    deviceChannel.setSecrecy(XmlUtil.getText(itemDevice, "Secrecy"));
                    deviceChannel.setIpAddress(XmlUtil.getText(itemDevice, "IPAddress"));
                    if (XmlUtil.getText(itemDevice, "Port") == null || XmlUtil.getText(itemDevice, "Port") == "") {
                        deviceChannel.setPort(0);
                    } else {
                        deviceChannel.setPort(Integer.parseInt(XmlUtil.getText(itemDevice, "Port")));
                    }
                    deviceChannel.setPassword(XmlUtil.getText(itemDevice, "Password"));
                    if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {
                        deviceChannel.setLongitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Longitude")));
                    } else {
                        deviceChannel.setLongitude(0.00);
                    }
                    if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Latitude"))) {
                        deviceChannel.setLatitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Latitude")));
                    } else {
                        deviceChannel.setLatitude(0.00);
                    }
                    if (XmlUtil.getText(itemDevice, "PTZType") == null
                            || XmlUtil.getText(itemDevice, "PTZType") == "") {
                        deviceChannel.setPTZType(0);
                    } else {
                        deviceChannel.setPTZType(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType")));
                    }
                    deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC
                    storager.updateChannel(device.getDeviceId(), deviceChannel);
                }
                // RequestMessage msg = new RequestMessage();
                // msg.setDeviceId(deviceId);
                // msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG);
                // msg.setData(device);
                // deferredResultHolder.invokeResult(msg);
                // 回复200 OK
                responseAck(evt, Response.OK);
                if (offLineDetector.isOnline(deviceId)) {
                    publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
                }
            }
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
    }
    public void setCmder(SIPCommander cmder) {
    }
    public void setStorager(IVideoManagerStorager storager) {
        this.storager = storager;
    }
    public void setPublisher(EventPublisher publisher) {
        this.publisher = publisher;
    }
    public void setRedis(RedisUtil redis) {
    }
    public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
    }
    public void setOffLineDetector(DeviceOffLineDetector offLineDetector) {
        this.offLineDetector = offLineDetector;
    }
    public IRedisCatchStorage getRedisCatchStorage() {
        return redisCatchStorage;
    }
    public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
        this.redisCatchStorage = redisCatchStorage;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
New file
@@ -0,0 +1,197 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper;
import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import gov.nist.javax.sip.RequestEventExt;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.Expires;
import gov.nist.javax.sip.header.SIPDateHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.*;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Locale;
/**
 * @description:收到注册请求 处理
 * @author: swwheihei
 * @date:   2020年5月3日 下午4:47:25
 */
@Component
public class RegisterRequestProcessor extends SIPRequestProcessorAbstract {
    private Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class);
    public String method = "REGISTER";
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private RegisterLogicHandler handler;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private EventPublisher publisher;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
    /**
     * 收到注册请求 处理
      * @param evt
     */
    @Override
    public void process(RequestEvent evt) {
        try {
            RequestEventExt evtExt = (RequestEventExt)evt;
            String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
            logger.info("[{}] 收到注册请求,开始处理", requestAddress);
            Request request = evt.getRequest();
            Response response = null;
            boolean passwordCorrect = false;
            // 注册标志  0:未携带授权头或者密码错误  1:注册成功   2:注销成功
            int registerFlag = 0;
            FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME);
            AddressImpl address = (AddressImpl) fromHeader.getAddress();
            SipUri uri = (SipUri) address.getURI();
            String deviceId = uri.getUser();
            Device device = storager.queryVideoDevice(deviceId);
            AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
            // 校验密码是否正确
            if (authorhead != null) {
                passwordCorrect = new DigestServerAuthenticationHelper().doAuthenticatePlainTextPassword(request,
                        sipConfig.getPassword());
            }
            if (StringUtils.isEmpty(sipConfig.getPassword())){
                passwordCorrect = true;
            }
            // 未携带授权头或者密码错误 回复401
            if (authorhead == null ) {
                logger.info("[{}] 未携带授权头 回复401", requestAddress);
                response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
                new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain());
            }else {
                if (!passwordCorrect){
                    // 注册失败
                    response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
                    response.setReasonPhrase("wrong password");
                    logger.info("[{}] 密码/SIP服务器ID错误, 回复403", requestAddress);
                }else {
                    // 携带授权头并且密码正确
                    response = getMessageFactory().createResponse(Response.OK, request);
                    // 添加date头
                    SIPDateHeader dateHeader = new SIPDateHeader();
                    // 使用自己修改的
                    WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
                    dateHeader.setDate(wvpSipDate);
                    response.addHeader(dateHeader);
                    ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
                    if (expiresHeader == null) {
                        response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
                        ServerTransaction serverTransaction = getServerTransaction(evt);
                        serverTransaction.sendResponse(response);
                        if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
                        return;
                    }
                    // 添加Contact头
                    response.addHeader(request.getHeader(ContactHeader.NAME));
                    // 添加Expires头
                    response.addHeader(request.getExpires());
                    // 获取到通信地址等信息
                    ViaHeader viaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
                    String received = viaHeader.getReceived();
                    int rPort = viaHeader.getRPort();
                    // 解析本地地址替代
                    if (StringUtils.isEmpty(received) || rPort == -1) {
                        received = viaHeader.getHost();
                        rPort = viaHeader.getPort();
                    }
                    //
                    if (device == null) {
                        device = new Device();
                        device.setStreamMode("UDP");
                        device.setCharset("gb2312");
                        device.setDeviceId(deviceId);
                        device.setFirsRegister(true);
                    }
                    device.setIp(received);
                    device.setPort(rPort);
                    device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
                    // 注销成功
                    if (expiresHeader.getExpires() == 0) {
                        registerFlag = 2;
                    }
                    // 注册成功
                    else {
                        device.setExpires(expiresHeader.getExpires());
                        registerFlag = 1;
                        // 判断TCP还是UDP
                        boolean isTcp = false;
                        ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
                        String transport = reqViaHeader.getTransport();
                        if (transport.equals("TCP")) {
                            isTcp = true;
                        }
                        device.setTransport(isTcp ? "TCP" : "UDP");
                    }
                }
            }
            ServerTransaction serverTransaction = getServerTransaction(evt);
            serverTransaction.sendResponse(response);
            if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
            // 注册成功
            // 保存到redis
            // 下发catelog查询目录
            if (registerFlag == 1 ) {
                logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
                publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);
                // 重新注册更新设备和通道,以免设备替换或更新后信息无法更新
                handler.onRegister(device);
            } else if (registerFlag == 2) {
                logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress);
                publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER);
            }
        } catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
New file
@@ -0,0 +1,75 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.ExpiresHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
/**
 * @description:SUBSCRIBE请求处理器
 * @author: swwheihei
 * @date:   2020年5月3日 下午5:31:20
 */
@Component
public class SubscribeRequestProcessor extends SIPRequestProcessorAbstract {
    private Logger logger = LoggerFactory.getLogger(SubscribeRequestProcessor.class);
    private String method = "SUBSCRIBE";
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
    /**
     * 处理SUBSCRIBE请求
     *
     * @param evt
     */
    @Override
    public void process(RequestEvent evt) {
        Request request = evt.getRequest();
        try {
            Response response = null;
            response = getMessageFactory().createResponse(200, request);
            if (response != null) {
                ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
                response.setExpires(expireHeader);
            }
            logger.info("response : " + response.toString());
            ServerTransaction transaction = getServerTransaction(evt);
            if (transaction != null) {
                transaction.sendResponse(response);
                transaction.getDialog().delete();
                transaction.terminate();
            } else {
                logger.info("processRequest serverTransactionId is null.");
            }
        } catch (ParseException e) {
            e.printStackTrace();
        } catch (SipException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java
New file
@@ -0,0 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response;
import javax.sip.ResponseEvent;
/**
 * @description:处理接收IPCamera发来的SIP协议响应消息
 * @author: swwheihei
 * @date:   2020年5月3日 下午4:42:22
 */
public interface ISIPResponseProcessor {
    void process(ResponseEvent evt);
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java
New file
@@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.ResponseEvent;
/**
 * @description: BYE请求响应器
 * @author: swwheihei
 * @date:   2020年5月3日 下午5:32:05
 */
@Component
public class ByeResponseProcessor extends SIPResponseProcessorAbstract {
    private String method = "BYE";
    @Autowired
    private SipLayer sipLayer;
    @Autowired
    private SipConfig config;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addResponseProcessor(method, this);
    }
    /**
     * 处理BYE响应
     *
     * @param evt
     */
    @Override
    public void process(ResponseEvent evt) {
        // TODO Auto-generated method stub
        System.out.println("收到bye");
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java
New file
@@ -0,0 +1,47 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.ResponseEvent;
/**
 * @description: CANCEL响应处理器
 * @author: panlinlin
 * @date:   2021年11月5日 16:35
 */
@Component
public class CancelResponseProcessor extends SIPResponseProcessorAbstract {
    private String method = "CANCEL";
    @Autowired
    private SipLayer sipLayer;
    @Autowired
    private SipConfig config;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addResponseProcessor(method, this);
    }
    /**
     * 处理CANCEL响应
     *
     * @param evt
     */
    @Override
    public void process(ResponseEvent evt) {
        // TODO Auto-generated method stub
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
New file
@@ -0,0 +1,97 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CSeqHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
/**
 * @description: 处理INVITE响应
 * @author: panlinlin
 * @date: 2021年11月5日 16:40
 */
@Component
public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
    private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class);
    private String method = "INVITE";
    @Autowired
    private SipLayer sipLayer;
    @Autowired
    private SipConfig config;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addResponseProcessor(method, this);
    }
    @Autowired
    private VideoStreamSessionManager streamSession;
    /**
     * 处理invite响应
     *
     * @param evt 响应消息
     * @throws ParseException
     */
    @Override
    public void process(ResponseEvent evt ){
        try {
            Response response = evt.getResponse();
            int statusCode = response.getStatusCode();
            // trying不会回复
            if (statusCode == Response.TRYING) {
            }
            // 成功响应
            // 下发ack
            if (statusCode == Response.OK) {
                ResponseEventExt event = (ResponseEventExt)evt;
                SIPDialog dialog = (SIPDialog)evt.getDialog();
                CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
                Request reqAck = dialog.createAck(cseq.getSeqNumber());
                SipURI requestURI = (SipURI) reqAck.getRequestURI();
                try {
                    requestURI.setHost(event.getRemoteIpAddress());
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                requestURI.setPort(event.getRemotePort());
                reqAck.setRequestURI(requestURI);
                logger.info("向 " + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack");
                SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI();
                String deviceId = requestURI.getUser();
                String channelId = sipURI.getUser();
                dialog.sendAck(reqAck);
            }
        } catch (InvalidArgumentException | SipException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
New file
@@ -0,0 +1,104 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.ResponseEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Response;
/**
 * @description:Register响应处理器
 * @author: swwheihei
 * @date:   2020年5月3日 下午5:32:23
 */
@Component
public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
    private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class);
    private String method = "REGISTER";
    @Autowired
    private ISIPCommanderForPlatform sipCommanderForPlatform;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addResponseProcessor(method, this);
    }
    /**
     * 处理Register响应
     *
      * @param evt 事件
     */
    @Override
    public void process(ResponseEvent evt) {
        Response response = evt.getResponse();
        CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME);
        String callId = callIdHeader.getCallId();
        String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId);
        if (platformGBId == null) {
            logger.info(String.format("未找到callId: %s 的注册/注销平台id", callId ));
            return;
        }
        ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId);
        if (parentPlatformCatch == null) {
            logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台缓存信息未查询到!!!", platformGBId, response.getStatusCode()));
            return;
        }
        String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册";
        logger.info(String.format("收到 %s %s的%S响应", platformGBId, action, response.getStatusCode() ));
        ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform();
        if (parentPlatform == null) {
            logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode()));
            return;
        }
        if (response.getStatusCode() == 401) {
            WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
            sipCommanderForPlatform.register(parentPlatform, callId, www, null, null);
        }else if (response.getStatusCode() == 200){
            // 注册/注销成功
            logger.info(String.format("%s %s成功", platformGBId, action));
            redisCatchStorage.delPlatformRegisterInfo(callId);
            parentPlatform.setStatus("注册".equals(action));
            // 取回Expires设置,避免注销过程中被置为0
            ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
            String expires = parentPlatformTmp.getExpires();
            parentPlatform.setExpires(expires);
            parentPlatform.setId(parentPlatformTmp.getId());
            storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
            redisCatchStorage.updatePlatformRegister(parentPlatform);
            redisCatchStorage.updatePlatformKeepalive(parentPlatform);
            parentPlatformCatch.setParentPlatform(parentPlatform);
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
New file
@@ -0,0 +1,24 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.Device;
/**
 * 设备相关业务处理
 */
public interface IDeviceService {
    /**
     * 添加目录订阅
     * @param device 设备信息
     * @return
     */
    boolean addCatalogSubscribe(Device device);
    /**
     * 移除目录订阅
     * @param device 设备信息
     * @return
     */
    boolean removeCatalogSubscribe(Device device);
}
src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java
New file
@@ -0,0 +1,48 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sip.ResponseEvent;
public class CatalogSubscribeTask implements Runnable{
    private final Logger logger = LoggerFactory.getLogger(CatalogSubscribeTask.class);
    private  Device device;
    private  ISIPCommander sipCommander;
    public CatalogSubscribeTask(Device device, ISIPCommander sipCommander) {
        this.device = device;
        this.sipCommander = sipCommander;
    }
    @Override
    public void run() {
        sipCommander.catalogSubscribe(device, eventResult -> {
            ResponseEvent event = (ResponseEvent) eventResult.event;
            Element rootElement = null;
            try {
                rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
            } catch (DocumentException e) {
                e.printStackTrace();
            }
            Element resultElement = rootElement.element("Result");
            String result = resultElement.getText();
            if (result.toUpperCase().equals("OK")){
                // 成功
                logger.info("目录订阅成功: {}", device.getDeviceId());
            }else {
                // 失败
                logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result);
            }
        },eventResult -> {
            // 失败
            logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId());
        });
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
New file
@@ -0,0 +1,61 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.bean.CatalogSubscribeTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DeviceServiceImpl implements IDeviceService {
    private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
    @Autowired
    private DynamicTask dynamicTask;
;
    @Autowired
    private ISIPCommander sipCommander;
    @Override
    public boolean addCatalogSubscribe(Device device) {
        if (device == null || device.getSubscribeCycleForCatalog() < 0) {
            return false;
        }
        // 添加目录订阅
        CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
        catalogSubscribeTask.run();
        // 提前开始刷新订阅
        String cron = getCron(device.getSubscribeCycleForCatalog() - 60);
        dynamicTask.startCron(device.getDeviceId(), catalogSubscribeTask, cron);
        return true;
    }
    @Override
    public boolean removeCatalogSubscribe(Device device) {
        if (device == null || device.getSubscribeCycleForCatalog() < 0) {
            return false;
        }
        dynamicTask.stopCron(device.getDeviceId());
        return true;
    }
    public String getCron(int time) {
        if (time <= 59) {
            return "0/" + time +" * * * * ?";
        }else if (time <= 60* 59) {
            int minute = time/(60);
            return "0 0/" + minute +" * * * ?";
        }else if (time <= 60* 60* 59) {
            int hour = time/(60*60);
            return "0 0 0/" + hour +" * * ?";
        }else {
            return "0 0/10 * * * ?";
        }
    }
}