648540858
2022-04-13 efc4a7bc8eb8a967198d70ff4d88670e71541164
优化级联移动位置订阅位置更新
11个文件已修改
186 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 27 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -39,7 +39,7 @@
    public void startCron(String key, Runnable task, int cycleForCatalog) {
        stop(key);
        // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
        ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L);
        ScheduledFuture future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L);
        futureMap.put(key, future);
        runnableMap.put(key, task);
    }
@@ -78,4 +78,7 @@
        return futureMap.keySet();
    }
    public Runnable get(String key) {
        return runnableMap.get(key);
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
@@ -99,7 +99,10 @@
        storager.updateDevice(device);
        // 上线添加订阅
        if (device.getSubscribeCycleForCatalog() > 0) {
            // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
            deviceService.addCatalogSubscribe(device);
        }
        if (device.getSubscribeCycleForMobilePosition() > 0) {
            deviceService.addMobilePositionSubscribe(device);
        }
    }
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
@@ -1,5 +1,9 @@
package com.genersoft.iot.vmp.gb28181.task;
import javax.sip.DialogState;
public interface ISubscribeTask extends Runnable{
    void stop();
    DialogState getDialogState();
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
@@ -5,6 +5,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
@@ -45,6 +46,7 @@
        });
    }
    @Async
    @Override
    public void stop() {
        /**
@@ -72,4 +74,10 @@
            });
        }
    }
    @Override
    public DialogState getDialogState() {
        if (dialog == null) return null;
        return dialog.getState();
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -1,22 +1,24 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import java.text.SimpleDateFormat;
import javax.sip.DialogState;
import java.util.List;
/**
 * 向已经订阅(移动位置)的上级发送MobilePosition消息
 */
public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
    private Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeHandlerTask.class);
    private IRedisCatchStorage redisCatchStorage;
    private IVideoManagerStorage storager;
@@ -25,8 +27,6 @@
    private String platformId;
    private String sn;
    private String key;
    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
        this.redisCatchStorage = redisCatchStorage;
@@ -38,40 +38,51 @@
        this.subscribeHolder = subscribeInfo;
    }
    @Async
    @Override
    public void run() {
        logger.info("执行MobilePositionSubscribeHandlerTask");
        SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
        if (subscribe != null) {
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
            if (parentPlatform == null || parentPlatform.isStatus()) {
                // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
                List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
                if (gbStreams.size() > 0) {
                    for (GbStream gbStream : gbStreams) {
                        String gbId = gbStream.getGbId();
                        GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
                        if (gpsMsgInfo != null) {
                            // 发送GPS消息
                            sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
                        }else {
                            // 没有在redis找到新的消息就使用数据库的消息
                            gpsMsgInfo = new GPSMsgInfo();
                            gpsMsgInfo.setId(gbId);
                            gpsMsgInfo.setLat(gbStream.getLongitude());
                            gpsMsgInfo.setLng(gbStream.getLongitude());
                            // 发送GPS消息
                            sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
                        }
            if (parentPlatform == null ) {
                logger.info("发送订阅时未找到平台信息:{}", platformId);
                return;
            }
            if (!parentPlatform.isStatus()) {
                logger.info("发送订阅时发现平台已经离线:{}", platformId);
                return;
            }
            // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
            List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
            if (gbStreams.size() == 0) {
                logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platformId);
                return;
            }
            for (GbStream gbStream : gbStreams) {
                String gbId = gbStream.getGbId();
                GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
                if (gpsMsgInfo != null) { // 无最新位置不发送
                    // 经纬度都为0不发送
                    if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
                        continue;
                    }
                    // 发送GPS消息
                    sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
                }
            }
        }
        logger.info("结束执行MobilePositionSubscribeHandlerTask");
    }
    @Override
    public void stop() {
    }
    @Override
    public DialogState getDialogState() {
        return null;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
@@ -6,6 +6,7 @@
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
@@ -25,6 +26,7 @@
        this.sipCommander = sipCommander;
    }
    @Async
    @Override
    public void run() {
        sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
@@ -74,4 +76,9 @@
            });
        }
    }
    @Override
    public DialogState getDialogState() {
        if (dialog == null) return null;
        return dialog.getState();
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1566,17 +1566,28 @@
            cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
            cmdXml.append("</Query>\r\n");
            String tm = Long.toString(System.currentTimeMillis());
            CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request;
            if (dialog != null) {
                logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState());
                request = dialog.createRequest(Request.SUBSCRIBE);
                ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
                request.setContent(cmdXml.toString(), contentTypeHeader);
                ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition());
                request.addHeader(expireHeader);
            }else {
                String tm = Long.toString(System.currentTimeMillis());
            // 有效时间默认为60秒以上
            Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
                    "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
                    callIdHeader);
                CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                        : udpSipProvider.getNewCallId();
                // 有效时间默认为60秒以上
                request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
                        "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
                        callIdHeader);
            }
            transmitRequest(device, request, errorEvent, okEvent);
            return true;
        } catch ( NumberFormatException | ParseException | InvalidArgumentException    | SipException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -405,7 +405,7 @@
            CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            callIdHeader.setCallId(subscribeInfo.getCallId());
            logger.info("[发送Notify-MobilePosition] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
            logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
            sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
                logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
            }, null);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -146,7 +146,7 @@
            } else {
                mobilePosition.setAltitude(0.0);
            }
            logger.info("[收到Notify-MobilePosition]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
            logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
            mobilePosition.setReportSource("Mobile Position");
            BaiduPoint bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
@@ -281,7 +281,7 @@
                    Element eventElement = itemDevice.element("Event");
                    DeviceChannel channel = XmlUtil.channelContentHander(itemDevice);
                    channel.setDeviceId(device.getDeviceId());
                    logger.info("[收到Notify-Catalog]:{}/{}", device.getDeviceId(), channel.getChannelId());
                    logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                    switch (eventElement.getText().toUpperCase()) {
                        case CatalogEvent.ON: // 上线
                            logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -150,7 +150,7 @@
        }
        String sn = XmlUtil.getText(rootElement, "SN");
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_MobilePosition_" + platformId;
        logger.info("[notify-MobilePosition]: {}", platformId);
        logger.info("[回复 移动位置订阅]: {}", platformId);
        StringBuilder resultXml = new StringBuilder(200);
        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
                .append("<Response>\r\n")
@@ -161,12 +161,21 @@
                .append("</Response>\r\n");
        if (subscribeInfo.getExpires() > 0) {
            if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) {
                dynamicTask.stop(key);
            if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) {
                String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
                subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
                dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
            }else {
                if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null
                        && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null
                        && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) {
                    dynamicTask.stop(key);
                    String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
                    subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
                    dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
                }
            }
            String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
            dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval) -1 );
            subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
        }else if (subscribeInfo.getExpires() == 0) {
            dynamicTask.stop(key);
            subscribeHolder.removeMobilePositionSubscribe(platformId);
@@ -203,7 +212,7 @@
        }
        String sn = XmlUtil.getText(rootElement, "SN");
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_Catalog_" + platformId;
        logger.info("[notify-Catalog]: {}", platformId);
        logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID);
        StringBuilder resultXml = new StringBuilder(200);
        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
                .append("<Response>\r\n")
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -4,8 +4,13 @@
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -29,9 +34,8 @@
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import javax.sip.DialogState;
import java.util.*;
@Api(tags = "国标设备查询", value = "国标设备查询")
@SuppressWarnings("rawtypes")
@@ -62,6 +66,9 @@
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private SubscribeHolder subscribeHolder;
    /**
     * 使用ID查询国标设备
@@ -469,4 +476,29 @@
        }
        return wvpResult;
    }
    @GetMapping("/{deviceId}/subscribe_info")
    @ApiOperation(value = "获取设备的订阅状态", notes = "获取设备的订阅状态")
    public WVPResult<Map<String, String>> getSubscribeInfo(@PathVariable String deviceId) {
        Set<String> allKeys = dynamicTask.getAllKeys();
        Map<String, String> dialogStateMap = new HashMap<>();
        for (String key : allKeys) {
            if (key.startsWith(deviceId)) {
                ISubscribeTask subscribeTask = (ISubscribeTask)dynamicTask.get(key);
                DialogState dialogState = subscribeTask.getDialogState();
                if (dialogState == null) {
                    continue;
                }
                if (subscribeTask instanceof CatalogSubscribeTask) {
                    dialogStateMap.put("catalog", dialogState.toString());
                }else if (subscribeTask instanceof MobilePositionSubscribeTask) {
                    dialogStateMap.put("mobilePosition", dialogState.toString());
                }
            }
        }
        WVPResult<Map<String, String>> wvpResult = new WVPResult<>();
        wvpResult.setCode(0);
        wvpResult.setData(dialogStateMap);
        return wvpResult;
    }
}