winfed
2023-06-09 a2d93fce811acc83ad5ff0b4a93403db22795a10
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -4,6 +4,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -11,8 +12,8 @@
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
@@ -42,7 +43,19 @@
    private ParentPlatformMapper platformMapper;
    @Autowired
    private PlatformCatalogMapper catalogMapper;
    @Autowired
    private PlatformChannelMapper platformChannelMapper;
    @Autowired
    private PlatformGbStreamMapper platformGbStreamMapper;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -113,36 +126,107 @@
    }
    @Override
    public void online(ParentPlatform parentPlatform) {
        logger.info("[国标级联]:{}, 平台上线/更新注册", parentPlatform.getServerGBId());
    public boolean update(ParentPlatform parentPlatform) {
        logger.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId());
        parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase());
        ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
        ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
        parentPlatform.setUpdateTime(DateUtil.getNow());
        if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) {
            // 目录结构发生变化,清空之前的关联关系
            logger.info("保存平台{}时发现目录结构变化,清空关联关系", parentPlatform.getDeviceGBId());
            catalogMapper.delByPlatformId(parentPlatformOld.getServerGBId());
            platformChannelMapper.delByPlatformId(parentPlatformOld.getServerGBId());
            platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId());
        }
        // 停止心跳定时
        final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
        dynamicTask.stop(keepaliveTaskKey);
        // 停止注册定时
        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatformOld.getServerGBId();
        dynamicTask.stop(registerTaskKey);
        // 注销旧的
        try {
            if (parentPlatformOld.isStatus()) {
                logger.info("保存平台{}时发现救平台在线,发送注销命令", parentPlatformOld.getServerGBId());
                commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
                    logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
                });
            }
        } catch (InvalidArgumentException | ParseException | SipException e) {
            logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
        }
        // 更新数据库
        if (parentPlatform.getCatalogGroup() == 0) {
            parentPlatform.setCatalogGroup(1);
        }
        if (parentPlatform.getAdministrativeDivision() == null) {
            parentPlatform.setAdministrativeDivision(parentPlatform.getAdministrativeDivision());
        }
        platformMapper.updateParentPlatform(parentPlatform);
        // 更新redis
        redisCatchStorage.delPlatformCatchInfo(parentPlatformOld.getServerGBId());
        ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch();
        parentPlatformCatch.setParentPlatform(parentPlatform);
        parentPlatformCatch.setId(parentPlatform.getServerGBId());
        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        // 注册
        if (parentPlatform.isEnable()) {
            // 保存时启用就发送注册
            // 注册成功时由程序直接调用了online方法
            try {
                logger.info("[国标级联] 平台注册 {}", parentPlatform.getDeviceGBId());
                commanderForPlatform.register(parentPlatform, eventResult -> {
                    logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
                }, null);
            } catch (InvalidArgumentException | ParseException | SipException e) {
                logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
            }
        }
        // 重新开启定时注册, 使用续订消息
        // 重新开始心跳保活
        return false;
    }
    @Override
    public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
        logger.info("[国标级联]:{}, 平台上线", parentPlatform.getServerGBId());
        platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
        ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
        if (parentPlatformCatch != null) {
            parentPlatformCatch.getParentPlatform().setStatus(true);
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        }else {
        if (parentPlatformCatch == null) {
            parentPlatformCatch = new ParentPlatformCatch();
            parentPlatformCatch.setParentPlatform(parentPlatform);
            parentPlatformCatch.setId(parentPlatform.getServerGBId());
            parentPlatform.setStatus(true);
            parentPlatformCatch.setParentPlatform(parentPlatform);
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        }
        parentPlatformCatch.getParentPlatform().setStatus(true);
        parentPlatformCatch.setSipTransactionInfo(sipTransactionInfo);
        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
        if (!dynamicTask.isAlive(registerTaskKey)) {
            logger.info("[国标级联]:{}, 添加定时注册任务", parentPlatform.getServerGBId());
            // 添加注册任务
            dynamicTask.startCron(registerTaskKey,
                // 注册失败(注册成功时由程序直接调用了online方法)
                ()-> {
                    registerTask(parentPlatform);
                },
                (parentPlatform.getExpires() - 10) *1000);
                ()-> registerTask(parentPlatform, sipTransactionInfo),
                    parentPlatform.getExpires() * 1000);
        }
        final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
        if (!dynamicTask.contains(keepaliveTaskKey)) {
            logger.info("[国标级联]:{}, 添加定时心跳任务", parentPlatform.getServerGBId());
            // 添加心跳任务
            dynamicTask.startCron(keepaliveTaskKey,
                    ()-> {
@@ -174,7 +258,7 @@
                                // 心跳成功
                                // 清空之前的心跳超时计数
                                ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                                if (platformCatch.getKeepAliveReply() > 0) {
                                if (platformCatch != null && platformCatch.getKeepAliveReply() > 0) {
                                    platformCatch.setKeepAliveReply(0);
                                    redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                }
@@ -183,11 +267,11 @@
                            logger.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
                        }
                    },
                    (parentPlatform.getKeepTimeout() - 10)*1000);
                    (parentPlatform.getKeepTimeout())*1000);
        }
    }
    private void registerTask(ParentPlatform parentPlatform){
    private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
        try {
            // 设置超时重发, 后续从底层支持消息重发
            String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout";
@@ -195,10 +279,10 @@
                return;
            }
            dynamicTask.startDelay(key, ()->{
                registerTask(parentPlatform);
                registerTask(parentPlatform, sipTransactionInfo);
            }, 1000);
            logger.info("[国标级联] 平台:{}注册即将到期,重新注册", parentPlatform.getServerGBId());
            commanderForPlatform.register(parentPlatform, eventResult -> {
            logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
            commanderForPlatform.register(parentPlatform, sipTransactionInfo,  eventResult -> {
                dynamicTask.stop(key);
                offline(parentPlatform, false);
            },eventResult -> {
@@ -248,6 +332,7 @@
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
        if (sendRtpItems != null && sendRtpItems.size() > 0) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Map<String, Object> param = new HashMap<>(3);