648540858
2024-02-27 c8b0e66e0eedaaa368bac3a49bcf1d34fb5e7393
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
old mode 100644 new mode 100755
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -10,10 +11,11 @@
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IInviteStreamService;
@@ -28,6 +30,7 @@
import com.genersoft.iot.vmp.utils.DateUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,17 +38,25 @@
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.PeerUnavailableException;
import javax.sip.SipException;
import javax.sip.SipFactory;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.*;
import javax.sip.message.Request;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
/**
 * @author lin
 */
@Service
@DS("master")
public class PlatformServiceImpl implements IPlatformService {
    private final static String REGISTER_KEY_PREFIX = "platform_register_";
@@ -57,15 +68,6 @@
    @Autowired
    private ParentPlatformMapper platformMapper;
    @Autowired
    private PlatformCatalogMapper catalogMapper;
    @Autowired
    private PlatformChannelMapper platformChannelMapper;
    @Autowired
    private PlatformGbStreamMapper platformGbStreamMapper;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
@@ -83,7 +85,7 @@
    private DynamicTask dynamicTask;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private SubscribeHolder subscribeHolder;
@@ -169,7 +171,7 @@
        dynamicTask.stop(registerTaskKey);
        // 注销旧的
        try {
            if (parentPlatformOld.isStatus()) {
            if (parentPlatformOld.isStatus() && parentPlatformCatchOld != null) {
                logger.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId());
                commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
                    logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
@@ -207,6 +209,7 @@
                logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
            }
        }
        return false;
    }
@@ -252,19 +255,19 @@
                        try {
                            commanderForPlatform.keepalive(parentPlatform, eventResult -> {
                                // 心跳失败
                                if (eventResult.type == SipSubscribe.EventResultType.timeout) {
                                    // 心跳超时
                                    ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                                    // 此时是第三次心跳超时, 平台离线
                                    if (platformCatch.getKeepAliveReply()  == 2) {
                                        // 设置平台离线,并重新注册
                                        logger.info("[国标级联] 三次心跳超时, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
                                        offline(parentPlatform, false);
                                    }
                                }else {
                                if (eventResult.type != SipSubscribe.EventResultType.timeout) {
                                    logger.warn("[国标级联]发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg);
                                }
                                // 心跳失败
                                ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                                // 此时是第三次心跳超时, 平台离线
                                if (platformCatch.getKeepAliveReply()  == 2) {
                                    // 设置平台离线,并重新注册
                                    logger.info("[国标级联] 三次心跳失败, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
                                    offline(parentPlatform, false);
                                }else {
                                    platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1);
                                    redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                }
                            }, eventResult -> {
@@ -275,6 +278,7 @@
                                    platformCatch.setKeepAliveReply(0);
                                    redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                }
                                logger.info("[发送心跳] 国标级联 发送心跳, code: {}, msg: {}", eventResult.statusCode, eventResult.msg);
                            });
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
@@ -282,6 +286,32 @@
                    },
                    (parentPlatform.getKeepTimeout())*1000);
        }
        if (parentPlatform.isAutoPushChannel()) {
            if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) {
                logger.info("[国标级联]:{}, 添加自动通道推送模拟订阅信息", parentPlatform.getServerGBId());
                addSimulatedSubscribeInfo(parentPlatform);
            }
        }else {
            SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
            if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) {
                subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
            }
        }
    }
    @Override
    public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) {
        // 自动添加一条模拟的订阅信息
        SubscribeInfo subscribeInfo = new SubscribeInfo();
        subscribeInfo.setId(parentPlatform.getServerGBId());
        subscribeInfo.setExpires(-1);
        subscribeInfo.setEventType("Catalog");
        int random = (int) Math.floor(Math.random() * 10000);
        subscribeInfo.setEventId(random + "");
        subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());
        subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", ""));
        subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", ""));
        subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo);
    }
    private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
@@ -302,7 +332,7 @@
                        eventResult.statusCode, eventResult.msg);
                offline(parentPlatform, false);
            }, null);
        } catch (InvalidArgumentException | ParseException | SipException e) {
        } catch (Exception e) {
            logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
        }
    }
@@ -336,9 +366,16 @@
            // 清除心跳任务
            dynamicTask.stop(keepaliveTaskKey);
        }
        // 停止目录订阅回复
        logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId());
        subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId());
        // 停止订阅回复
        SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
        if (catalogSubscribe != null) {
            if (catalogSubscribe.getExpires() > 0) {
                logger.info("[平台离线] {}, 停止目录订阅回复", parentPlatform.getServerGBId());
                subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
            }
        }
        logger.info("[平台离线] {}, 停止移动位置订阅回复", parentPlatform.getServerGBId());
        subscribeHolder.removeMobilePositionSubscribe(parentPlatform.getServerGBId());
        // 发起定时自动重新注册
        if (!stopRegister) {
            // 设置为60秒自动尝试重新注册
@@ -364,7 +401,7 @@
                param.put("vhost", "__defaultVhost__");
                param.put("app", sendRtpItem.getApp());
                param.put("stream", sendRtpItem.getStream());
                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
            }
        }
    }
@@ -431,12 +468,11 @@
        }
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
        if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
            // 如果zlm不存在这个流,则删除数据即可
            MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
            if (mediaServerItemForStreamInfo != null) {
                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
                Boolean ready = zlmServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
                if (!ready) {
                    // 错误存在于redis中的数据
                    inviteStreamService.removeInviteInfo(inviteInfo);
@@ -556,7 +592,23 @@
    }
    @Override
    public void stopBroadcast(ParentPlatform platform, String channelId, String stream) throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException {
        commanderForPlatform.streamByeCmd(platform, channelId, stream, null, null);
    public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServerItem mediaServerItem) {
        try {
            if (sendBye) {
                commanderForPlatform.streamByeCmd(platform, channel.getChannelId(), stream, null, null);
            }
        } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
            logger.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getChannelId() );
        } finally {
            mediaServerService.closeRTPServer(mediaServerItem, stream);
            InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getChannelId(), stream);
            if (inviteInfo != null) {
                // 释放ssrc
                mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc());
                inviteStreamService.removeInviteInfo(inviteInfo);
            }
            streamSession.remove(platform.getServerGBId(), channel.getChannelId(), stream);
        }
    }
}