mk1990
2022-04-18 22e1d92a9d8ae6aa257889f882722b8e48648abc
Merge branch '648540858:wvp-28181-2.0' into wvp-28181-2.0
24个文件已修改
1个文件已添加
1个文件已删除
789 ■■■■■ 已修改文件
bootstrap.sh 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/SyncChannelProgress.vue 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bootstrap.sh
New file
@@ -0,0 +1,91 @@
#!/bin/bash
######################################################
# Copyright 2019 Pham Ngoc Hoai
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Repo: https://github.com/tyrion9/spring-boot-startup-script
#
######### PARAM ######################################
JAVA_OPT=-Xmx1024m
JARFILE=`ls -1r *.jar 2>/dev/null | head -n 1`
PID_FILE=pid.file
RUNNING=N
PWD=`pwd`
######### DO NOT MODIFY ########
if [ -f $PID_FILE ]; then
        PID=`cat $PID_FILE`
        if [ ! -z "$PID" ] && kill -0 $PID 2>/dev/null; then
                RUNNING=Y
        fi
fi
start()
{
        if [ $RUNNING == "Y" ]; then
                echo "Application already started"
        else
                if [ -z "$JARFILE" ]
                then
                        echo "ERROR: jar file not found"
                else
                        nohup java  $JAVA_OPT -Djava.security.egd=file:/dev/./urandom -jar $PWD/$JARFILE > nohup.out 2>&1  &
                        echo $! > $PID_FILE
                        echo "Application $JARFILE starting..."
                        tail -f nohup.out
                fi
        fi
}
stop()
{
        if [ $RUNNING == "Y" ]; then
                kill -9 $PID
                rm -f $PID_FILE
                echo "Application stopped"
        else
                echo "Application not running"
        fi
}
restart()
{
        stop
        start
}
case "$1" in
        'start')
                start
                ;;
        'stop')
                stop
                ;;
        'restart')
                restart
                ;;
        *)
                echo "Usage: $0 {  start | stop | restart  }"
                exit 1
                ;;
esac
exit 0
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -1,6 +1,9 @@
package com.genersoft.iot.vmp.conf;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -18,6 +21,8 @@
@Component
public class DynamicTask {
    private Logger logger = LoggerFactory.getLogger(DynamicTask.class);
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@@ -26,7 +31,12 @@
    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        return new ThreadPoolTaskScheduler();
        ThreadPoolTaskScheduler schedulerPool = new ThreadPoolTaskScheduler();
        schedulerPool.setPoolSize(300);
        schedulerPool.setWaitForTasksToCompleteOnShutdown(true);
        schedulerPool.setAwaitTerminationSeconds(10);
        return schedulerPool;
    }
    /**
@@ -37,11 +47,24 @@
     * @return
     */
    public void startCron(String key, Runnable task, int cycleForCatalog) {
        stop(key);
        ScheduledFuture future = futureMap.get(key);
        if (future != null) {
            if (future.isCancelled()) {
                logger.info("任务【{}】已存在但是关闭状态!!!", key);
            } else {
                logger.info("任务【{}】已存在且已启动!!!", key);
                return;
            }
        }
        // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
        ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L);
        futureMap.put(key, future);
        runnableMap.put(key, task);
        future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L);
        if (future != null){
            futureMap.put(key, future);
            runnableMap.put(key, task);
            logger.info("任务【{}】启动成功!!!", key);
        }else {
            logger.info("任务【{}】启动失败!!!", key);
        }
    }
    /**
@@ -54,9 +77,25 @@
    public void startDelay(String key, Runnable task, int delay) {
        stop(key);
        Date starTime = new Date(System.currentTimeMillis() + delay);
        ScheduledFuture future = futureMap.get(key);
        if (future != null) {
            if (future.isCancelled()) {
                logger.info("任务【{}】已存在但是关闭状态!!!", key);
            } else {
                logger.info("任务【{}】已存在且已启动!!!", key);
                return;
            }
        }
        // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
        ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime);
        futureMap.put(key, future);
        future = threadPoolTaskScheduler.schedule(task, starTime);
        if (future != null){
            futureMap.put(key, future);
            runnableMap.put(key, task);
            logger.info("任务【{}】启动成功!!!", key);
        }else {
            logger.info("任务【{}】启动失败!!!", key);
        }
    }
    public void stop(String key) {
@@ -78,4 +117,7 @@
        return futureMap.keySet();
    }
    public Runnable get(String key) {
        return runnableMap.get(key);
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -48,6 +48,7 @@
        properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP");
        properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getMonitorIp());
        properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true");
        properties.setProperty("gov.nist.javax.sip.DELIVER_UNSOLICITED_NOTIFY", "true"); // 接收所有notify请求,即使没有订阅
        /**
         * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE =
         * 0; public static final int TRACE_MESSAGES = 16; public static final int
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
@@ -4,6 +4,7 @@
import java.util.List;
public class CatalogData {
    private int sn; // 命令序列号
    private int total;
    private List<DeviceChannel> channelList;
    private Date lastTime;
@@ -15,6 +16,15 @@
    }
    private CatalogDataStatus status;
    public int getSn() {
        return sn;
    }
    public void setSn(int sn) {
        this.sn = sn;
    }
    public int getTotal() {
        return total;
    }
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -1,5 +1,12 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@@ -9,12 +16,32 @@
@Component
public class SubscribeHolder {
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ISIPCommanderForPlatform sipCommanderForPlatform;
    @Autowired
    private IVideoManagerStorage storager;
    private final String taskOverduePrefix = "subscribe_overdue_";
    private static ConcurrentHashMap<String, SubscribeInfo> catalogMap = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, SubscribeInfo> mobilePositionMap = new ConcurrentHashMap<>();
    public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
        catalogMap.put(platformId, subscribeInfo);
        // 添加订阅到期
        String taskOverdueKey = taskOverduePrefix +  "catalog_" + platformId;
        dynamicTask.stop(taskOverdueKey);
        // 添加任务处理订阅过期
        dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
                subscribeInfo.getExpires() * 1000);
    }
    public SubscribeInfo getCatalogSubscribe(String platformId) {
@@ -23,10 +50,24 @@
    public void removeCatalogSubscribe(String platformId) {
        catalogMap.remove(platformId);
        String taskOverdueKey = taskOverduePrefix +  "catalog_" + platformId;
        // 添加任务处理订阅过期
        dynamicTask.stop(taskOverdueKey);
    }
    public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) {
        mobilePositionMap.put(platformId, subscribeInfo);
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX +  "MobilePosition_" + platformId;
        // 添加任务处理GPS定时推送
        dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, subscribeInfo.getSn(), key, this), subscribeInfo.getGpsInterval());
        String taskOverdueKey = taskOverduePrefix +  "MobilePosition_" + platformId;
        dynamicTask.stop(taskOverdueKey);
        // 添加任务处理订阅过期
        dynamicTask.startDelay(taskOverdueKey, () -> {
                    System.out.println("订阅过期");
                    removeMobilePositionSubscribe(subscribeInfo.getId());
                },
                subscribeInfo.getExpires() * 1000);
    }
    public SubscribeInfo getMobilePositionSubscribe(String platformId) {
@@ -35,6 +76,12 @@
    public void removeMobilePositionSubscribe(String platformId) {
        mobilePositionMap.remove(platformId);
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX +  "MobilePosition_" + platformId;
        // 结束任务处理GPS定时推送
        dynamicTask.stop(key);
        String taskOverdueKey = taskOverduePrefix +  "MobilePosition_" + platformId;
        // 添加任务处理订阅过期
        dynamicTask.stop(taskOverdueKey);
    }
    public List<String> getAllCatalogSubscribePlatform() {
@@ -48,7 +95,7 @@
    }
    public void removeAllSubscribe(String platformId) {
        mobilePositionMap.remove(platformId);
        catalogMap.remove(platformId);
        removeMobilePositionSubscribe(platformId);
        removeCatalogSubscribe(platformId);
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
@@ -33,6 +33,14 @@
    private ServerTransaction transaction;
    private Dialog dialog;
    /**
     * 以下为可选字段
     * @return
     */
    private String sn;
    private int gpsInterval;
    public String getId() {
        return id;
    }
@@ -88,4 +96,20 @@
    public void setDialog(Dialog dialog) {
        this.dialog = dialog;
    }
    public String getSn() {
        return sn;
    }
    public void setSn(String sn) {
        this.sn = sn;
    }
    public int getGpsInterval() {
        return gpsInterval;
    }
    public void setGpsInterval(int gpsInterval) {
        this.gpsInterval = gpsInterval;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
@@ -54,6 +54,7 @@
    @Autowired
    private SIPCommander cmder;
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
@@ -76,7 +77,7 @@
            if (deviceInStore == null) { //第一次上线
                logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
                cmder.deviceInfoQuery(device);
                cmder.catalogQuery(device, null);
                deviceService.sync(device);
            }
            break;
        // 设备主动发送心跳触发的在线事件
@@ -99,7 +100,10 @@
        storager.updateDevice(device);
        // 上线添加订阅
        if (device.getSubscribeCycleForCatalog() > 0) {
            // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
            deviceService.addCatalogSubscribe(device);
        }
        if (device.getSubscribeCycleForMobilePosition() > 0) {
            deviceService.addMobilePositionSubscribe(device);
        }
    }
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
File was deleted
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -61,8 +61,6 @@
        if (event.getPlatformId() != null) {
            parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
            if (parentPlatform != null && !parentPlatform.isStatus())return;
            String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_Catalog_" + event.getPlatformId();
//            subscribe = redisCatchStorage.getSubscribe(key);
            subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
            if (subscribe == null) {
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
@@ -26,28 +26,35 @@
    @Autowired
    private IVideoManagerStorage storager;
    public void addReady(String key) {
        CatalogData catalogData = data.get(key);
    public void addReady(Device device, int sn ) {
        CatalogData catalogData = data.get(device.getDeviceId());
        if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
            catalogData = new CatalogData();
            catalogData.setChannelList(new ArrayList<>());
            catalogData.setDevice(device);
            catalogData.setSn(sn);
            catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
            catalogData.setLastTime(new Date(System.currentTimeMillis()));
            data.put(key, catalogData);
            data.put(device.getDeviceId(), catalogData);
        }
    }
    public void put(String key, int total, Device device, List<DeviceChannel> deviceChannelList) {
        CatalogData catalogData = data.get(key);
    public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null) {
            catalogData = new CatalogData();
            catalogData.setSn(sn);
            catalogData.setTotal(total);
            catalogData.setDevice(device);
            catalogData.setChannelList(new ArrayList<>());
            catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
            catalogData.setLastTime(new Date(System.currentTimeMillis()));
            data.put(key, catalogData);
            data.put(deviceId, catalogData);
        }else {
            // 同一个设备的通道同步请求只考虑一个,其他的直接忽略
            if (catalogData.getSn() != sn) {
                return;
            }
            catalogData.setTotal(total);
            catalogData.setDevice(device);
            catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
@@ -56,30 +63,26 @@
        }
    }
    public List<DeviceChannel> get(String key) {
        CatalogData catalogData = data.get(key);
    public List<DeviceChannel> get(String deviceId) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null) return null;
        return catalogData.getChannelList();
    }
    public int getTotal(String key) {
        CatalogData catalogData = data.get(key);
    public int getTotal(String deviceId) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null) return 0;
        return catalogData.getTotal();
    }
    public SyncStatus getSyncStatus(String key) {
        CatalogData catalogData = data.get(key);
    public SyncStatus getSyncStatus(String deviceId) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null) return null;
        SyncStatus syncStatus = new SyncStatus();
        syncStatus.setCurrent(catalogData.getChannelList().size());
        syncStatus.setTotal(catalogData.getTotal());
        syncStatus.setErrorMsg(catalogData.getErrorMsg());
        return syncStatus;
    }
    public void del(String key) {
        data.remove(key);
    }
    @Scheduled(fixedRate = 5 * 1000)   //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
@@ -92,23 +95,30 @@
        Calendar calendarBefore30S = Calendar.getInstance();
        calendarBefore30S.setTime(new Date());
        calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30);
        for (String key : keys) {
            CatalogData catalogData = data.get(key);
            if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
                storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
                String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
        for (String deviceId : keys) {
            CatalogData catalogData = data.get(deviceId);
            if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
                if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
                    storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
                    if (catalogData.getTotal() != catalogData.getChannelList().size()) {
                        String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
                        catalogData.setErrorMsg(errorMsg);
                    }
                }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
                    String errorMsg = "同步失败,等待回复超时";
                    catalogData.setErrorMsg(errorMsg);
                }
                catalogData.setStatus(CatalogData.CatalogDataStatus.end);
                catalogData.setErrorMsg(errorMsg);
            }
            if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
                data.remove(key);
            if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
                data.remove(deviceId);
            }
        }
    }
    public void setChannelSyncEnd(String key, String errorMsg) {
        CatalogData catalogData = data.get(key);
    public void setChannelSyncEnd(String deviceId, String errorMsg) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null)return;
        catalogData.setStatus(CatalogData.CatalogDataStatus.end);
        catalogData.setErrorMsg(errorMsg);
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
@@ -1,5 +1,9 @@
package com.genersoft.iot.vmp.gb28181.task;
import javax.sip.DialogState;
public interface ISubscribeTask extends Runnable{
    void stop();
    DialogState getDialogState();
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
@@ -5,6 +5,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
@@ -72,4 +73,10 @@
            });
        }
    }
    @Override
    public DialogState getDialogState() {
        if (dialog == null) return null;
        return dialog.getState();
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -1,16 +1,16 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import java.text.SimpleDateFormat;
import javax.sip.DialogState;
import java.util.List;
/**
@@ -18,20 +18,21 @@
 */
public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
    private Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeHandlerTask.class);
    private IRedisCatchStorage redisCatchStorage;
    private IVideoManagerStorage storager;
    private ISIPCommanderForPlatform sipCommanderForPlatform;
    private SubscribeHolder subscribeHolder;
    private String platformId;
    private ParentPlatform platform;
    private String sn;
    private String key;
    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
        System.out.println("MobilePositionSubscribeHandlerTask 初始化");
        this.redisCatchStorage = redisCatchStorage;
        this.storager = storager;
        this.platformId = platformId;
        this.platform = storager.queryParentPlatByServerGBId(platformId);
        this.sn = sn;
        this.key = key;
        this.sipCommanderForPlatform = sipCommanderForPlatform;
@@ -41,30 +42,31 @@
    @Override
    public void run() {
        SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
        if (platform == null) return;
        SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
        if (subscribe != null) {
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
            if (parentPlatform == null || parentPlatform.isStatus()) {
                // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
                List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
                if (gbStreams.size() > 0) {
                    for (GbStream gbStream : gbStreams) {
                        String gbId = gbStream.getGbId();
                        GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
                        if (gpsMsgInfo != null) {
                            // 发送GPS消息
                            sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
                        }else {
                            // 没有在redis找到新的消息就使用数据库的消息
                            gpsMsgInfo = new GPSMsgInfo();
                            gpsMsgInfo.setId(gbId);
                            gpsMsgInfo.setLat(gbStream.getLongitude());
                            gpsMsgInfo.setLng(gbStream.getLongitude());
                            // 发送GPS消息
                            sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
                        }
//            if (!parentPlatform.isStatus()) {
//                logger.info("发送订阅时发现平台已经离线:{}", platformId);
//                return;
//            }
            // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
            List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId());
            if (gbStreams.size() == 0) {
                logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platform.getServerGBId());
                return;
            }
            for (GbStream gbStream : gbStreams) {
                String gbId = gbStream.getGbId();
                GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
                if (gpsMsgInfo != null) { // 无最新位置不发送
                    logger.info("无最新位置不发送");
                    // 经纬度都为0不发送
                    if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
                        continue;
                    }
                    // 发送GPS消息
                    sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
                }
            }
        }
@@ -74,4 +76,9 @@
    public void stop() {
    }
    @Override
    public DialogState getDialogState() {
        return null;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
@@ -6,10 +6,13 @@
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.ResponseEvent;
import java.util.Timer;
import java.util.TimerTask;
/**
 * 移动位置订阅的定时更新
@@ -20,6 +23,8 @@
    private  ISIPCommander sipCommander;
    private Dialog dialog;
    private Timer timer ;
    public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander) {
        this.device = device;
        this.sipCommander = sipCommander;
@@ -27,10 +32,14 @@
    @Override
    public void run() {
        if (timer != null ) {
            timer.cancel();
            timer = null;
        }
        sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
            if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
                dialog = eventResult.dialog;
            }
//            if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
//                dialog = eventResult.dialog;
//            }
            ResponseEvent event = (ResponseEvent) eventResult.event;
            if (event.getResponse().getRawContent() != null) {
                // 成功
@@ -43,6 +52,13 @@
            dialog = null;
            // 失败
            logger.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
            timer = new Timer();
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    MobilePositionSubscribeTask.this.run();
                }
            }, 2000);
        });
    }
@@ -56,6 +72,10 @@
         * COMPLETED-> Completed Dialog状态-已完成
         * TERMINATED-> Terminated Dialog状态-终止
         */
        if (timer != null ) {
            timer.cancel();
            timer = null;
        }
        if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
            logger.info("取消移动订阅时dialog状态为{}", dialog.getState());
            device.setSubscribeCycleForMobilePosition(0);
@@ -74,4 +94,9 @@
            });
        }
    }
    @Override
    public DialogState getDialogState() {
        if (dialog == null) return null;
        return dialog.getState();
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -250,7 +250,7 @@
     * 
     * @param device 视频设备
     */
    boolean catalogQuery(Device device, SipSubscribe.Event errorEvent);
    boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent);
    
    /**
     * 查询录像信息
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1208,14 +1208,14 @@
     * @param device 视频设备
     */ 
    @Override
    public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
    public boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) {
        try {
            StringBuffer catalogXml = new StringBuffer(200);
            String charset = device.getCharset();
            catalogXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
            catalogXml.append("<Query>\r\n");
            catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
            catalogXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
            catalogXml.append("<SN>" + sn + "</SN>\r\n");
            catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
            catalogXml.append("</Query>\r\n");
            
@@ -1566,17 +1566,28 @@
            cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
            cmdXml.append("</Query>\r\n");
            String tm = Long.toString(System.currentTimeMillis());
            CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request;
            if (dialog != null) {
                logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState());
                request = dialog.createRequest(Request.SUBSCRIBE);
                ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
                request.setContent(cmdXml.toString(), contentTypeHeader);
                ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition());
                request.addHeader(expireHeader);
            }else {
                String tm = Long.toString(System.currentTimeMillis());
            // 有效时间默认为60秒以上
            Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
                    "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
                    callIdHeader);
                CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                        : udpSipProvider.getNewCallId();
                // 有效时间默认为60秒以上
                request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
                        "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
                        callIdHeader);
            }
            transmitRequest(device, request, errorEvent, okEvent);
            return true;
        } catch ( NumberFormatException | ParseException | InvalidArgumentException    | SipException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -385,7 +385,7 @@
        if (parentPlatform == null) {
            return false;
        }
        logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
        try {
            String characterSet = parentPlatform.getCharacterSet();
            StringBuffer deviceStatusXml = new StringBuffer(600);
@@ -405,7 +405,7 @@
            CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            callIdHeader.setCallId(subscribeInfo.getCallId());
            logger.info("[发送Notify-MobilePosition] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
            sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
                logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
            }, null);
@@ -459,7 +459,7 @@
         // 设置编码, 防止中文乱码
        messageFactory.setDefaultContentEncodingCharset(characterSet);
        Dialog dialog  = subscribeInfo.getDialog();
        if (dialog == null) return;
        if (dialog == null || !dialog.getState().equals(DialogState.CONFIRMED)) return;
        SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY);
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
        notifyRequest.setContent(catalogXmlContent, contentTypeHeader);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -146,7 +146,7 @@
            } else {
                mobilePosition.setAltitude(0.0);
            }
            logger.info("[收到Notify-MobilePosition]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
            logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
            mobilePosition.setReportSource("Mobile Position");
            BaiduPoint bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
@@ -281,7 +281,7 @@
                    Element eventElement = itemDevice.element("Event");
                    DeviceChannel channel = XmlUtil.channelContentHander(itemDevice);
                    channel.setDeviceId(device.getDeviceId());
                    logger.info("[收到Notify-Catalog]:{}/{}", device.getDeviceId(), channel.getChannelId());
                    logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                    switch (eventElement.getText().toUpperCase()) {
                        case CatalogEvent.ON: // 上线
                            logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -149,8 +149,7 @@
            subscribeInfo.setDialog(dialog);
        }
        String sn = XmlUtil.getText(rootElement, "SN");
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_MobilePosition_" + platformId;
        logger.info("[notify-MobilePosition]: {}", platformId);
        logger.info("[回复 移动位置订阅]: {}", platformId);
        StringBuilder resultXml = new StringBuilder(200);
        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
                .append("<Response>\r\n")
@@ -161,14 +160,25 @@
                .append("</Response>\r\n");
        if (subscribeInfo.getExpires() > 0) {
            if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) {
                dynamicTask.stop(key);
            }
            String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
            dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval) -1 );
            if (interval == null) {
                subscribeInfo.setGpsInterval(5);
            }else {
                subscribeInfo.setGpsInterval(Integer.parseInt(interval));
            }
            subscribeInfo.setSn(sn);
            subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
//            if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) {
//                subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
//            }else {
//                if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null
//                        && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null
//                        && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) {
//                    subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
//                }
//            }
        }else if (subscribeInfo.getExpires() == 0) {
            dynamicTask.stop(key);
            subscribeHolder.removeMobilePositionSubscribe(platformId);
        }
@@ -202,8 +212,7 @@
            subscribeInfo.setDialog(dialog);
        }
        String sn = XmlUtil.getText(rootElement, "SN");
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_Catalog_" + platformId;
        logger.info("[notify-Catalog]: {}", platformId);
        logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID);
        StringBuilder resultXml = new StringBuilder(200);
        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
                .append("<Response>\r\n")
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -86,23 +86,17 @@
            rootElement = getRootElement(evt, device.getCharset());
            Element deviceListElement = rootElement.element("DeviceList");
            Element sumNumElement = rootElement.element("SumNum");
            if (sumNumElement == null || deviceListElement == null) {
            Element snElement = rootElement.element("SN");
            if (snElement == null || sumNumElement == null || deviceListElement == null) {
                responseAck(evt, Response.BAD_REQUEST, "xml error");
                return;
            }
            int sumNum = Integer.parseInt(sumNumElement.getText());
            if (sumNum == 0) {
                // 数据已经完整接收
                storager.cleanChannelsForDevice(device.getDeviceId());
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                WVPResult<Object> result = new WVPResult<>();
                result.setCode(0);
                result.setData(device);
                msg.setData(result);
                result.setMsg("更新成功,共0条");
                deferredResultHolder.invokeAllResult(msg);
                catalogDataCatch.del(key);
                catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
            }else {
                Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                if (deviceListIterator != null) {
@@ -123,31 +117,22 @@
                        channelList.add(deviceChannel);
                    }
                    int sn = Integer.parseInt(snElement.getText());
                    logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum);
                    catalogDataCatch.put(key, sumNum, device, channelList);
                    if (catalogDataCatch.get(key).size() == sumNum) {
                    catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList);
                    if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) {
                        // 数据已经完整接收
                        boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
                        RequestMessage msg = new RequestMessage();
                        msg.setKey(key);
                        WVPResult<Object> result = new WVPResult<>();
                        result.setCode(0);
                        result.setData(device);
                        if (resetChannelsResult || sumNum ==0) {
                            result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
                        boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId()));
                        if (!resetChannelsResult) {
                            String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条";
                            catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg);
                        }else {
                            result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
                            catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
                        }
                        msg.setData(result);
                        deferredResultHolder.invokeAllResult(msg);
                        catalogDataCatch.del(key);
                    }
                }
                // 回复200 OK
                responseAck(evt, Response.OK);
                if (offLineDetector.isOnline(device.getDeviceId())) {
                    publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
                }
            }
        } catch (DocumentException e) {
            e.printStackTrace();
@@ -231,21 +216,18 @@
    }
    public SyncStatus getChannelSyncProgress(String deviceId) {
        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
        if (catalogDataCatch.get(key) == null) {
        if (catalogDataCatch.get(deviceId) == null) {
            return null;
        }else {
            return catalogDataCatch.getSyncStatus(key);
            return catalogDataCatch.getSyncStatus(deviceId);
        }
    }
    public void setChannelSyncReady(String deviceId) {
        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
        catalogDataCatch.addReady(key);
    public void setChannelSyncReady(Device device, int sn) {
        catalogDataCatch.addReady(device, sn);
    }
    public void setChannelSyncEnd(String deviceId, String errorMsg) {
        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
        catalogDataCatch.setChannelSyncEnd(key, errorMsg);
        catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg);
    }
}
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
@@ -44,15 +44,8 @@
    SyncStatus getChannelSyncStatus(String deviceId);
    /**
     * 设置通道同步状态
     * @param deviceId 设备ID
     * 通道同步
     * @param device
     */
    void setChannelSyncReady(String deviceId);
    /**
     * 设置同步结束
     * @param deviceId 设备ID
     * @param errorMsg 错误信息
     */
    void setChannelSyncEnd(String deviceId, String errorMsg);
    void sync(Device device);
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -14,6 +14,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.sip.DialogState;
/**
 * 设备业务(目录订阅)
 */
@@ -39,19 +41,17 @@
        if (device == null || device.getSubscribeCycleForCatalog() < 0) {
            return false;
        }
        if (dynamicTask.contains(device.getDeviceId() + "catalog")) {
            // 存在则停止现有的,开启新的
            dynamicTask.stop(device.getDeviceId() + "catalog");
        CatalogSubscribeTask task = (CatalogSubscribeTask)dynamicTask.get(device.getDeviceId() + "catalog");
        if (task != null && task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加
            return true;
        }
        logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
        // 添加目录订阅
        CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
        catalogSubscribeTask.run();
        // 提前开始刷新订阅
        int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog();
        int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
        // 设置最小值为30
        subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30);
        dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog);
        dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog -1);
        return true;
    }
@@ -70,18 +70,16 @@
        if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
            return false;
        }
        if (dynamicTask.contains(device.getDeviceId() + "mobile_position")) {
            // 存在则停止现有的,开启新的
            dynamicTask.stop(device.getDeviceId() + "mobile_position");
        }
        logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId());
        MobilePositionSubscribeTask task = (MobilePositionSubscribeTask)dynamicTask.get(device.getDeviceId() + "mobile_position");
        if (task != null &&  task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加
            return true;
        }
        // 添加目录订阅
        MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander);
        mobilePositionSubscribeTask.run();
        // 提前开始刷新订阅
        int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog();
        // 设置最小值为30
        subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30);
        int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
        dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog -1 );
        return true;
    }
@@ -102,12 +100,16 @@
    }
    @Override
    public void setChannelSyncReady(String deviceId) {
        catalogResponseMessageHandler.setChannelSyncReady(deviceId);
    }
    @Override
    public void setChannelSyncEnd(String deviceId, String errorMsg) {
        catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg);
    public void sync(Device device) {
        if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) {
            logger.info("开启同步时发现同步已经存在");
            return;
        }
        int sn = (int)((Math.random()*9+1)*100000);
        catalogResponseMessageHandler.setChannelSyncReady(device, sn);
        sipCommander.catalogQuery(device, sn, event -> {
            String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
            catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
        });
    }
}
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -238,12 +238,15 @@
    @Override
    public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
        if (deviceChannelList == null) {
            return false;
        }
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        // 数据去重
        List<DeviceChannel> channels = new ArrayList<>();
        StringBuilder stringBuilder = new StringBuilder();
        Map<String, Integer> subContMap = new HashMap<>();
        if (deviceChannelList.size() > 1) {
        if (deviceChannelList != null && deviceChannelList.size() > 1) {
            // 数据去重
            Set<String> gbIdSet = new HashSet<>();
            for (DeviceChannel deviceChannel : deviceChannelList) {
@@ -300,6 +303,7 @@
            dataSourceTransactionManager.commit(transactionStatus);     //手动提交
            return true;
        }catch (Exception e) {
            e.printStackTrace();
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
        }
@@ -415,10 +419,9 @@
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        boolean result = false;
        try {
            if (platformChannelMapper.delChannelForDeviceId(deviceId) <0  // 删除与国标平台的关联
                    || deviceChannelMapper.cleanChannelsByDeviceId(deviceId) < 0 // 删除他的通道
                    || deviceMapper.del(deviceId) < 0 // 移除设备信息
            ) {
            platformChannelMapper.delChannelForDeviceId(deviceId);
            deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
            if ( deviceMapper.del(deviceId) < 0 ) {
                //事务回滚
                dataSourceTransactionManager.rollback(transactionStatus);
            }
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -4,8 +4,13 @@
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -29,9 +34,8 @@
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import javax.sip.DialogState;
import java.util.*;
@Api(tags = "国标设备查询", value = "国标设备查询")
@SuppressWarnings("rawtypes")
@@ -62,6 +66,9 @@
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private SubscribeHolder subscribeHolder;
    /**
     * 使用ID查询国标设备
@@ -165,12 +172,8 @@
            wvpResult.setData(syncStatus);
            return wvpResult;
        }
        SyncStatus syncStatusReady = new SyncStatus();
        deviceService.setChannelSyncReady(deviceId);
        cmder.catalogQuery(device, event -> {
            String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
            deviceService.setChannelSyncEnd(deviceId, errorMsg);
        });
        deviceService.sync(device);
        WVPResult<SyncStatus> wvpResult = new WVPResult<>();
        wvpResult.setCode(0);
        wvpResult.setMsg("开始同步");
@@ -469,4 +472,29 @@
        }
        return wvpResult;
    }
    @GetMapping("/{deviceId}/subscribe_info")
    @ApiOperation(value = "获取设备的订阅状态", notes = "获取设备的订阅状态")
    public WVPResult<Map<String, String>> getSubscribeInfo(@PathVariable String deviceId) {
        Set<String> allKeys = dynamicTask.getAllKeys();
        Map<String, String> dialogStateMap = new HashMap<>();
        for (String key : allKeys) {
            if (key.startsWith(deviceId)) {
                ISubscribeTask subscribeTask = (ISubscribeTask)dynamicTask.get(key);
                DialogState dialogState = subscribeTask.getDialogState();
                if (dialogState == null) {
                    continue;
                }
                if (subscribeTask instanceof CatalogSubscribeTask) {
                    dialogStateMap.put("catalog", dialogState.toString());
                }else if (subscribeTask instanceof MobilePositionSubscribeTask) {
                    dialogStateMap.put("mobilePosition", dialogState.toString());
                }
            }
        }
        WVPResult<Map<String, String>> wvpResult = new WVPResult<>();
        wvpResult.setCode(0);
        wvpResult.setData(dialogStateMap);
        return wvpResult;
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
@@ -4,6 +4,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.VManageBootstrap;
import com.genersoft.iot.vmp.common.VersionPo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo;
@@ -27,6 +28,7 @@
import javax.sip.SipProvider;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@SuppressWarnings("rawtypes")
@Api(tags = "服务控制")
@@ -42,13 +44,16 @@
    private IMediaServerService mediaServerService;
    @Autowired
    VersionInfo versionInfo;
    private VersionInfo versionInfo;
    @Autowired
    SipConfig sipConfig;
    private SipConfig sipConfig;
    @Autowired
    UserSetting userSetting;
    private UserSetting userSetting;
    @Autowired
    private DynamicTask dynamicTask;
    @Value("${server.port}")
    private int serverPort;
@@ -248,4 +253,35 @@
        result.setData(jsonObject);
        return result;
    }
//    @ApiOperation("当前进行中的动态任务")
//    @GetMapping(value = "/dynamicTask")
//    @ResponseBody
//    public WVPResult<JSONObject> getDynamicTask(){
//        WVPResult<JSONObject> result = new WVPResult<>();
//        result.setCode(0);
//        result.setMsg("success");
//
//        JSONObject jsonObject = new JSONObject();
//
//        Set<String> allKeys = dynamicTask.getAllKeys();
//        jsonObject.put("server.port", serverPort);
//        if (StringUtils.isEmpty(type)) {
//            jsonObject.put("sip", JSON.toJSON(sipConfig));
//            jsonObject.put("base", JSON.toJSON(userSetting));
//        }else {
//            switch (type){
//                case "sip":
//                    jsonObject.put("sip", sipConfig);
//                    break;
//                case "base":
//                    jsonObject.put("base", userSetting);
//                    break;
//                default:
//                    break;
//            }
//        }
//        result.setData(jsonObject);
//        return result;
//    }
}
web_src/src/components/dialog/SyncChannelProgress.vue
@@ -61,23 +61,36 @@
          if (!this.syncFlag) {
            this.syncFlag = true;
          }
          if (res.data.data == null) {
            this.syncStatus = "success"
            this.percentage = 100;
            this.msg = '同步成功';
          }else if (res.data.data.total == 0){
            this.msg = `等待同步中`;
            this.timmer = setTimeout(this.getProgress, 300)
          }else if (res.data.data.errorMsg !== null ){
            this.msg = res.data.data.errorMsg;
            this.syncStatus = "exception"
          }else {
            this.total = res.data.data.total;
            this.current = res.data.data.current;
            this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
            this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
            this.timmer = setTimeout(this.getProgress, 300)
          if (res.data.data != null) {
            if (res.data.data.total == 0) {
              if (res.data.data.errorMsg !== null ){
                this.msg = res.data.data.errorMsg;
                this.syncStatus = "exception"
              }else {
                this.msg = `等待同步中`;
                this.timmer = setTimeout(this.getProgress, 300)
              }
            }else  {
              if (res.data.data.total == res.data.data.current) {
                this.syncStatus = "success"
                this.percentage = 100;
                this.msg = '同步成功';
              }else {
                if (res.data.data.errorMsg !== null ){
                  this.msg = res.data.data.errorMsg;
                  this.syncStatus = "exception"
                }else {
                  this.total = res.data.data.total;
                  this.current = res.data.data.current;
                  this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
                  this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
                  this.timmer = setTimeout(this.getProgress, 300)
                }
              }
            }
          }
        }else {
          if (this.syncFlag) {
            this.syncStatus = "success"