648540858
2022-11-17 6b8ecd1f9d2abe1e6ac0af858487755a58a2643a
优化级联注册稳定性
7个文件已修改
66 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -127,11 +127,15 @@
    public void execute(){
        if (futureMap.size() > 0) {
            for (String key : futureMap.keySet()) {
                if (futureMap.get(key).isDone()) {
                if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) {
                    futureMap.remove(key);
                    runnableMap.remove(key);
                }
            }
        }
    }
    public boolean isAlive(String key) {
        return futureMap.get(key) != null && !futureMap.get(key).isDone() && !futureMap.get(key).isCancelled();
    }
}
src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
@@ -2,7 +2,6 @@
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -47,7 +46,7 @@
            parentPlatformCatch.setId(parentPlatform.getServerGBId());
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
            // 设置所有平台离线
            platformService.offline(parentPlatform);
            platformService.offline(parentPlatform, true);
            // 取消订阅
            sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
                platformService.login(parentPlatform);
src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
@@ -5,11 +5,9 @@
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @description: 录像查询结束事件
@@ -22,13 +20,12 @@
    private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class);
    private static Map<String, SseEmitter> sseEmitters = new Hashtable<>();
    public interface RecordEndEventHandler{
        void  handler(RecordInfo recordInfo);
    }
    private Map<String, RecordEndEventHandler> handlerMap = new HashMap<>();
    private Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>();
    @Override
    public void onApplicationEvent(RecordEndEvent event) {
        logger.info("录像查询完成事件触发,deviceId:{}, channelId: {}, 录像数量{}条", event.getRecordInfo().getDeviceId(),
@@ -38,7 +35,6 @@
                recordEndEventHandler.handler(event.getRecordInfo());
            }
        }
    }
    public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -100,7 +100,7 @@
            if (platformRegisterInfo.isRegister()) {
                platformService.online(parentPlatform);
            }else {
                platformService.offline(parentPlatform);
                platformService.offline(parentPlatform, false);
            }
            // 注册/注销成功移除缓存的信息
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -169,7 +169,6 @@
                    .build();
            Response response = client.newCall(request).execute();
            if (response.isSuccessful()) {
                logger.info("response body contentType: " + Objects.requireNonNull(response.body()).contentType());
                if (targetPath != null) {
                    File snapFolder = new File(targetPath);
                    if (!snapFolder.exists()) {
src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
@@ -1,12 +1,7 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.github.pagehelper.PageInfo;
import java.util.List;
/**
 * 国标平台的业务类
@@ -40,7 +35,7 @@
     * 平台离线
     * @param parentPlatform 平台信息
     */
    void offline(ParentPlatform parentPlatform);
    void offline(ParentPlatform parentPlatform, boolean stopRegisterTask);
    /**
     * 向上级平台发起注册
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -22,7 +22,6 @@
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.sip.TimeoutEvent;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
@@ -131,20 +130,23 @@
        }
        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
        if (dynamicTask.contains(registerTaskKey)) {
            dynamicTask.stop(registerTaskKey);
        }
        // 添加注册任务
        dynamicTask.startDelay(registerTaskKey,
        if (!dynamicTask.isAlive(registerTaskKey)) {
            // 添加注册任务
            dynamicTask.startCron(registerTaskKey,
                // 注册失败(注册成功时由程序直接调用了online方法)
                ()-> {
                    try {
                        commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null);
                        logger.info("[国标级联] 平台:{}注册即将到期,重新注册", parentPlatform.getServerGBId());
                        commanderForPlatform.register(parentPlatform, eventResult -> {
                            offline(parentPlatform, false);
                        },null);
                    } catch (InvalidArgumentException | ParseException | SipException e) {
                        logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
                    }
                },
                (parentPlatform.getExpires() - 10) *1000);
        }
        final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
        if (!dynamicTask.contains(keepaliveTaskKey)) {
@@ -160,16 +162,11 @@
                                    // 此时是第三次心跳超时, 平台离线
                                    if (platformCatch.getKeepAliveReply()  == 2) {
                                        // 设置平台离线,并重新注册
                                        offline(parentPlatform);
                                        logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId());
                                        try {
                                            commanderForPlatform.register(parentPlatform, eventResult1 -> {
                                                logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败,开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId());
                                                // 添加注册任务
                                                dynamicTask.startCron(registerTaskKey,
                                                        // 注册失败(注册成功时由程序直接调用了online方法)
                                                        ()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
                                                        60*1000);
                                                offline(parentPlatform, false);
                                            }, null);
                                        } catch (InvalidArgumentException | ParseException | SipException e) {
                                            logger.error("[命令发送失败] 国标级联 注册: {}", e.getMessage());
@@ -198,7 +195,7 @@
    }
    @Override
    public void offline(ParentPlatform parentPlatform) {
    public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
        logger.info("[平台离线]:{}", parentPlatform.getServerGBId());
        ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
        parentPlatformCatch.setKeepAliveReply(0);
@@ -212,11 +209,13 @@
        // 停止所有推流
        logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId());
        stopAllPush(parentPlatform.getServerGBId());
        // 清除注册定时
        logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
        if (dynamicTask.contains(registerTaskKey)) {
            dynamicTask.stop(registerTaskKey);
        if (stopRegister) {
            // 清除注册定时
            logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
            final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
            if (dynamicTask.contains(registerTaskKey)) {
                dynamicTask.stop(registerTaskKey);
            }
        }
        // 清除心跳定时
        logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId());