648540858
2023-08-07 6905f00293cc50d236e159ec39f0af577a72ae44
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,18 +1,19 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
@@ -20,7 +21,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.sip.TimeoutEvent;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,6 +35,8 @@
public class PlatformServiceImpl implements IPlatformService {
    private final static String REGISTER_KEY_PREFIX = "platform_register_";
    private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
    private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
    private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class);
@@ -40,7 +45,19 @@
    private ParentPlatformMapper platformMapper;
    @Autowired
    private PlatformCatalogMapper catalogMapper;
    @Autowired
    private PlatformChannelMapper platformChannelMapper;
    @Autowired
    private PlatformGbStreamMapper platformGbStreamMapper;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -52,10 +69,16 @@
    private DynamicTask dynamicTask;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private SubscribeHolder subscribeHolder;
    @Autowired
    private GbStreamMapper gbStreamMapper;
    @Autowired
    private UserSetting userSetting;
@@ -93,83 +116,173 @@
        if (parentPlatform.isEnable()) {
            // 保存时启用就发送注册
            // 注册成功时由程序直接调用了online方法
            commanderForPlatform.register(parentPlatform, eventResult -> {
                logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
            }, null);
            try {
                commanderForPlatform.register(parentPlatform, eventResult -> {
                    logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
                }, null);
            } catch (InvalidArgumentException | ParseException | SipException e) {
                logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
            }
        }
        return result > 0;
    }
    @Override
    public void online(ParentPlatform parentPlatform) {
        logger.info("[国标级联]:{}, 平台上线/更新注册", parentPlatform.getServerGBId());
    public boolean update(ParentPlatform parentPlatform) {
        logger.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId());
        parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase());
        ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
        ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
        parentPlatform.setUpdateTime(DateUtil.getNow());
        // 停止心跳定时
        final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
        dynamicTask.stop(keepaliveTaskKey);
        // 停止注册定时
        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatformOld.getServerGBId();
        dynamicTask.stop(registerTaskKey);
        // 注销旧的
        try {
            if (parentPlatformOld.isStatus()) {
                logger.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId());
                commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
                    logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
                });
            }
        } catch (InvalidArgumentException | ParseException | SipException e) {
            logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
        }
        // 更新数据库
        if (parentPlatform.getCatalogGroup() == 0) {
            parentPlatform.setCatalogGroup(1);
        }
        if (parentPlatform.getAdministrativeDivision() == null) {
            parentPlatform.setAdministrativeDivision(parentPlatform.getAdministrativeDivision());
        }
        platformMapper.updateParentPlatform(parentPlatform);
        // 更新redis
        redisCatchStorage.delPlatformCatchInfo(parentPlatformOld.getServerGBId());
        ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch();
        parentPlatformCatch.setParentPlatform(parentPlatform);
        parentPlatformCatch.setId(parentPlatform.getServerGBId());
        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        // 注册
        if (parentPlatform.isEnable()) {
            // 保存时启用就发送注册
            // 注册成功时由程序直接调用了online方法
            try {
                logger.info("[国标级联] 平台注册 {}", parentPlatform.getDeviceGBId());
                commanderForPlatform.register(parentPlatform, eventResult -> {
                    logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
                }, null);
            } catch (InvalidArgumentException | ParseException | SipException e) {
                logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
            }
        }
        return false;
    }
    @Override
    public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
        logger.info("[国标级联]:{}, 平台上线", parentPlatform.getServerGBId());
        final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
        dynamicTask.stop(registerFailAgainTaskKey);
        platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
        ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
        if (parentPlatformCatch != null) {
            parentPlatformCatch.getParentPlatform().setStatus(true);
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        }else {
        if (parentPlatformCatch == null) {
            parentPlatformCatch = new ParentPlatformCatch();
            parentPlatformCatch.setParentPlatform(parentPlatform);
            parentPlatformCatch.setId(parentPlatform.getServerGBId());
            parentPlatform.setStatus(true);
            parentPlatformCatch.setParentPlatform(parentPlatform);
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        }
        parentPlatformCatch.getParentPlatform().setStatus(true);
        parentPlatformCatch.setSipTransactionInfo(sipTransactionInfo);
        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
        if (dynamicTask.contains(registerTaskKey)) {
            dynamicTask.stop(registerTaskKey);
        }
        // 添加注册任务
        dynamicTask.startDelay(registerTaskKey,
        if (!dynamicTask.isAlive(registerTaskKey)) {
            logger.info("[国标级联]:{}, 添加定时注册任务", parentPlatform.getServerGBId());
            // 添加注册任务
            dynamicTask.startCron(registerTaskKey,
                // 注册失败(注册成功时由程序直接调用了online方法)
                ()->commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null),
                parentPlatform.getExpires()*1000);
                ()-> registerTask(parentPlatform, sipTransactionInfo),
                    parentPlatform.getExpires() * 1000);
        }
        final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
        if (!dynamicTask.contains(keepaliveTaskKey)) {
            logger.info("[国标级联]:{}, 添加定时心跳任务", parentPlatform.getServerGBId());
            // 添加心跳任务
            dynamicTask.startCron(keepaliveTaskKey,
                    ()-> commanderForPlatform.keepalive(parentPlatform, eventResult -> {
                        // 心跳失败
                        if (eventResult.type == SipSubscribe.EventResultType.timeout) {
                            // 心跳超时
                            ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                            // 此时是第三次心跳超时, 平台离线
                            if (platformCatch.getKeepAliveReply()  == 2) {
                                // 设置平台离线,并重新注册
                                offline(parentPlatform);
                                logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId());
                                commanderForPlatform.register(parentPlatform, eventResult1 -> {
                                    logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败,开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId());
                                    // 添加注册任务
                                    dynamicTask.startCron(registerTaskKey,
                                            // 注册失败(注册成功时由程序直接调用了online方法)
                                            ()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
                                            60*1000);
                                }, null);
                            }
                    ()-> {
                        try {
                            commanderForPlatform.keepalive(parentPlatform, eventResult -> {
                                // 心跳失败
                                if (eventResult.type == SipSubscribe.EventResultType.timeout) {
                                    // 心跳超时
                                    ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                                    // 此时是第三次心跳超时, 平台离线
                                    if (platformCatch.getKeepAliveReply()  == 2) {
                                        // 设置平台离线,并重新注册
                                        logger.info("[国标级联] 三次心跳超时, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
                                        offline(parentPlatform, false);
                        }else {
                            logger.warn("[国标级联]发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg);
                        }
                                    }
                    }, eventResult -> {
                        // 心跳成功
                        // 清空之前的心跳超时计数
                        ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                        if (platformCatch.getKeepAliveReply() > 0) {
                            platformCatch.setKeepAliveReply(0);
                            redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                }else {
                                    logger.warn("[国标级联]发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg);
                                }
                            }, eventResult -> {
                                // 心跳成功
                                // 清空之前的心跳超时计数
                                ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                                if (platformCatch != null && platformCatch.getKeepAliveReply() > 0) {
                                    platformCatch.setKeepAliveReply(0);
                                    redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                }
                            });
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
                        }
                    }),
                    parentPlatform.getExpires()*1000);
                    },
                    (parentPlatform.getKeepTimeout())*1000);
        }
    }
    private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
        try {
            // 不在同一个会话中续订则每次全新注册
            if (!userSetting.isRegisterKeepIntDialog()) {
                sipTransactionInfo = null;
            }
            if (sipTransactionInfo == null) {
                logger.info("[国标级联] 平台:{}注册即将到期,开始重新注册", parentPlatform.getServerGBId());
            }else {
                logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
            }
            commanderForPlatform.register(parentPlatform, sipTransactionInfo,  eventResult -> {
                logger.info("[国标级联] 平台:{}注册失败,{}:{}", parentPlatform.getServerGBId(),
                        eventResult.statusCode, eventResult.msg);
                offline(parentPlatform, false);
            }, null);
        } catch (InvalidArgumentException | ParseException | SipException e) {
            logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
        }
    }
    @Override
    public void offline(ParentPlatform parentPlatform) {
    public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
        logger.info("[平台离线]:{}", parentPlatform.getServerGBId());
        ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
        parentPlatformCatch.setKeepAliveReply(0);
@@ -183,6 +296,7 @@
        // 停止所有推流
        logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId());
        stopAllPush(parentPlatform.getServerGBId());
        // 清除注册定时
        logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
@@ -193,40 +307,91 @@
        logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId());
        final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
        if (dynamicTask.contains(keepaliveTaskKey)) {
            // 添加心跳任务
            // 清除心跳任务
            dynamicTask.stop(keepaliveTaskKey);
        }
        // 停止目录订阅回复
        logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId());
        subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId());
        // 发起定时自动重新注册
        if (!stopRegister) {
            // 设置为60秒自动尝试重新注册
            final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
            ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId());
            if (platform.isEnable()) {
                dynamicTask.startCron(registerFailAgainTaskKey,
                        ()-> registerTask(platform, null),
                        userSetting.getRegisterAgainAfterTime() * 1000);
            }
        }
    }
    private void stopAllPush(String platformId) {
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
        if (sendRtpItems != null && sendRtpItems.size() > 0) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Map<String, Object> param = new HashMap<>(3);
                param.put("vhost", "__defaultVhost__");
                param.put("app", sendRtpItem.getApp());
                param.put("stream", sendRtpItem.getStreamId());
                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
            }
        }
    }
    @Override
    public void login(ParentPlatform parentPlatform) {
        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
        commanderForPlatform.register(parentPlatform, eventResult1 -> {
            logger.info("[国标级联] {},开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId());
            // 添加注册任务
            dynamicTask.startCron(registerTaskKey,
                    // 注册失败(注册成功时由程序直接调用了online方法)
                    ()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
                    60*1000);
        }, null);
        try {
            commanderForPlatform.register(parentPlatform, eventResult1 -> {
                logger.info("[国标级联] {},开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId());
                // 添加注册任务
                dynamicTask.startCron(registerTaskKey,
                        // 注册失败(注册成功时由程序直接调用了online方法)
                        ()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
                        60*1000);
            }, null);
        } catch (InvalidArgumentException | ParseException | SipException e) {
            logger.error("[命令发送失败] 国标级联注册: {}", e.getMessage());
        }
    }
    @Override
    public void sendNotifyMobilePosition(String platformId) {
        ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
        if (platform == null) {
            return;
        }
        SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
        if (subscribe != null) {
            // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
            List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId(), userSetting.isUsePushingAsStatus());
            if (gbStreams.size() == 0) {
                return;
            }
            for (DeviceChannel deviceChannel : gbStreams) {
                String gbId = deviceChannel.getChannelId();
                GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
                // 无最新位置不发送
                if (gpsMsgInfo != null) {
                    // 经纬度都为0不发送
                    if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
                        continue;
                    }
                    // 发送GPS消息
                    try {
                        commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
                    } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
                             IllegalAccessException e) {
                        logger.error("[命令发送失败] 国标级联 移动位置通知: {}", e.getMessage());
                    }
                }
            }
        }
    }
}