648540858
2022-06-14 e0344ccf9725fe3d22a90ab11257396071e7f55f
国标级联推送推流 支持多wvp间自动选择与推送
29个文件已修改
7个文件已添加
1 文件已重命名
1481 ■■■■ 已修改文件
sql/update.sql 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 78 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 226 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 25 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java 170 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java 173 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java 377 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/rtcPlayer.vue 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sql/update.sql
@@ -1,12 +1,4 @@
alter table parent_platform
    add startOfflinePush int default 0 null;
alter table stream_push
    add serverId varchar(50) not null;
alter table parent_platform
    add administrativeDivision varchar(50) not null;
alter table parent_platform
    add catalogGroup int default 1 null;
alter table device
    add ssrcCheck int default 0 null;
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -97,4 +97,5 @@
    //**************************    第三方  ****************************************
    public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
    public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
}
src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
@@ -2,7 +2,9 @@
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.service.impl.RedisAlarmMsgListener;
import com.genersoft.iot.vmp.service.impl.RedisGPSMsgListener;
import com.genersoft.iot.vmp.service.impl.RedisGpsMsgListener;
import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.impl.RedisStreamMsgListener;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -47,10 +49,16 @@
    private int poolMaxWait;
    @Autowired
    private RedisGPSMsgListener redisGPSMsgListener;
    private RedisGpsMsgListener redisGPSMsgListener;
    @Autowired
    private RedisAlarmMsgListener redisAlarmMsgListener;
    @Autowired
    private RedisStreamMsgListener redisStreamMsgListener;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Bean
    public JedisPool jedisPool() {
@@ -98,6 +106,8 @@
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
        container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
        container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
        container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
        return container;
    }
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -72,6 +72,11 @@
    private String mediaServerId;
    /**
     * 使用的服务的ID
     */
    private String serverId;
    /**
     *  invite的callId
     */
    private String CallId;
@@ -259,4 +264,12 @@
    public void setOnlyAudio(boolean onlyAudio) {
        this.onlyAudio = onlyAudio;
    }
    public String getServerId() {
        return serverId;
    }
    public void setServerId(String serverId) {
        this.serverId = serverId;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -71,7 +71,9 @@
                String gbId = gbStream.getGbId();
                GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
                if (gpsMsgInfo != null) { // 无最新位置不发送
                    logger.info("无最新位置不发送");
                   if (logger.isDebugEnabled()) {
                       logger.debug("无最新位置不发送");
                   }
                    // 经纬度都为0不发送
                    if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
                        continue;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -16,6 +16,8 @@
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils;
@@ -43,7 +45,7 @@
public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
    private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
    private String method = "ACK";
    private final String method = "ACK";
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
@@ -77,6 +79,9 @@
    @Autowired
    private ISIPCommanderForPlatform commanderForPlatform;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    /**   
@@ -114,7 +119,24 @@
            param.put("pt", sendRtpItem.getPt());
            param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
            param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
            if (mediaInfo == null) {
                RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                        sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
                        sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
                        sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
                redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, jsonObject->{
                    startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
                });
            }else {
            JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
            }
        }
    }
    private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
                                        JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
            if (jsonObject == null) {
                logger.error("RTP推流失败: 请检查ZLM服务");
            } else if (jsonObject.getInteger("code") == 0) {
@@ -132,60 +154,6 @@
                    // 向上级平台
                    commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
                }
            }
//            if (streamInfo == null) { // 流还没上来,对方就回复ack
//                logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId());
//                // 监听流上线
//                // 添加订阅
//                JSONObject subscribeKey = new JSONObject();
//                subscribeKey.put("app", "rtp");
//                subscribeKey.put("stream", sendRtpItem.getStreamId());
//                subscribeKey.put("regist", true);
//                subscribeKey.put("schema", "rtmp");
//                subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
//                subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
//                        (MediaServerItem mediaServerItemInUse, JSONObject json)->{
//                            Map<String, Object> param = new HashMap<>();
//                            param.put("vhost","__defaultVhost__");
//                            param.put("app",json.getString("app"));
//                            param.put("stream",json.getString("stream"));
//                            param.put("ssrc", sendRtpItem.getSsrc());
//                            param.put("dst_url",sendRtpItem.getIp());
//                            param.put("dst_port", sendRtpItem.getPort());
//                            param.put("is_udp", is_Udp);
//                            param.put("src_port", sendRtpItem.getLocalPort());
//                            zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
//                        });
//            }else {
//                Map<String, Object> param = new HashMap<>();
//                param.put("vhost","__defaultVhost__");
//                param.put("app",streamInfo.getApp());
//                param.put("stream",streamInfo.getStream());
//                param.put("ssrc", sendRtpItem.getSsrc());
//                param.put("dst_url",sendRtpItem.getIp());
//                param.put("dst_port", sendRtpItem.getPort());
//                param.put("is_udp", is_Udp);
//                param.put("src_port", sendRtpItem.getLocalPort());
//
//                JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
//                if (jsonObject.getInteger("code") != 0) {
//                    logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream());
//                    // 监听流上线
//                    // 添加订阅
//                    JSONObject subscribeKey = new JSONObject();
//                    subscribeKey.put("app", "rtp");
//                    subscribeKey.put("stream", streamInfo.getStream());
//                    subscribeKey.put("regist", true);
//                    subscribeKey.put("schema", "rtmp");
//                    subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
//                    subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
//                            (MediaServerItem mediaServerItemInUse, JSONObject json)->{
//                                zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
//                            });
//                }
//            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -107,13 +107,9 @@
                            cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null);
                        }
                        if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                            MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
                            messageForPushChannel.setType(0);
                            messageForPushChannel.setGbId(sendRtpItem.getChannelId());
                            messageForPushChannel.setApp(sendRtpItem.getApp());
                            messageForPushChannel.setStream(sendRtpItem.getStreamId());
                            messageForPushChannel.setMediaServerId(sendRtpItem.getMediaServerId());
                            messageForPushChannel.setPlatFormId(sendRtpItem.getPlatformId());
                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                    sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
                            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
                        }
                    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java
@@ -15,7 +15,7 @@
@Component
public class CancelRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
    private String method = "CANCEL";
    private final String method = "CANCEL";
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -17,10 +17,13 @@
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -52,13 +55,16 @@
    private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class);
    private String method = "INVITE";
    private final String method = "INVITE";
    @Autowired
    private SIPCommanderFroPlatform cmderFroPlatform;
    @Autowired
    private IVideoManagerStorage storager;
    @Autowired
    private IStreamPushService streamPushService;
    @Autowired
    private IRedisCatchStorage  redisCatchStorage;
@@ -94,6 +100,10 @@
    private ZLMMediaListManager mediaListManager;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -103,15 +113,14 @@
    /**
     * 处理invite请求
     * 
     * @param evt
     *            请求消息
     * @param evt 请求消息
     */ 
    @Override
    public void process(RequestEvent evt) {
        //  Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
        try {
            Request request = evt.getRequest();
            SipURI sipURI = (SipURI) request.getRequestURI();
            SipURI sipUri = (SipURI) request.getRequestURI();
            //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。
            //String channelId = sipURI.getUser();
            String channelId = SipUtils.getChannelIdFromHeader(request);
@@ -119,7 +128,8 @@
            CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
            if (requesterId == null || channelId == null) {
                logger.info("无法从FromHeader的Address中获取到平台id,返回400");
                responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误
                // 参数不全, 发400,请求错误
                responseAck(evt, Response.BAD_REQUEST);
                return;
            }
@@ -132,7 +142,9 @@
                DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
                GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
                PlatformCatalog catalog = storager.getCatalog(channelId);
                MediaServerItem mediaServerItem = null;
                StreamPushItem streamPushItem = null;
                // 不是通道可能是直播流
                if (channel != null && gbStream == null ) {
                    if (channel.getStatus() == 0) {
@@ -142,12 +154,31 @@
                    }
                    responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
                }else if(channel == null && gbStream != null){
                    String mediaServerId = gbStream.getMediaServerId();
                    mediaServerItem = mediaServerService.getOne(mediaServerId);
                    if (mediaServerItem == null) {
                        if ("proxy".equals(gbStream.getStreamType())) {
                        logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
                        responseAck(evt, Response.GONE);
                        return;
                        } else {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(evt, Response.GONE);
                                return;
                            }
                        }
                    } else {
                        if ("push".equals(gbStream.getStreamType())) {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(evt, Response.GONE);
                                return;
                            }
                        }
                    }
                    responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
                }else if (catalog != null) {
@@ -388,9 +419,87 @@
                        }
                    }
                }else if (gbStream != null) {
                    if (streamPushItem.isStatus()) {
                        // 在线状态
                        pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                    } else {
                        // 不在线 拉起
                        notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                    }
                }
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
            logger.warn("sdp解析错误");
            e.printStackTrace();
        } catch (SdpParseException e) {
            e.printStackTrace();
        } catch (SdpException e) {
            e.printStackTrace();
        }
    }
    /**
     * 安排推流
     */
    private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
        // 推流
        if (streamPushItem.getServerId().equals(userSetting.getServerId())) {
                    Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
                    if (!streamReady ) {
            if (streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
                    responseAck(evt, Response.BUSY_HERE);
                    return;
                }
                if (tcpActive != null) {
                    sendRtpItem.setTcpActive(tcpActive);
                }
                sendRtpItem.setPlayType(InviteStreamType.PUSH);
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setCallId(callIdHeader.getCallId());
                byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
                sendRtpItem.setDialog(dialogByteArray);
                byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                sendRtpItem.setTransaction(transactionByteArray);
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
            } else {
                // 不在线 拉起
                notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            }
        } else {
            // 其他平台内容
            otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
        }
    }
    /**
     * 通知流上线
     */
    private void notifyStreamOnline(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
                        if ("proxy".equals(gbStream.getStreamType())) {
                            // TODO 控制启用以使设备上线
                            logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流",gbStream.getApp(), gbStream.getStream());
@@ -402,15 +511,10 @@
                            }
                            // 发送redis消息以使设备上线
                            logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",gbStream.getApp(), gbStream.getStream());
                            MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
                            messageForPushChannel.setType(1);
                            messageForPushChannel.setGbId(gbStream.getGbId());
                            messageForPushChannel.setApp(gbStream.getApp());
                            messageForPushChannel.setStream(gbStream.getStream());
                            // TODO 获取低负载的节点
                            messageForPushChannel.setMediaServerId(gbStream.getMediaServerId());
                            messageForPushChannel.setPlatFormId(platform.getServerGBId());
                            messageForPushChannel.setPlatFormName(platform.getName());
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                    gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
                    platform.getName(), null, gbStream.getMediaServerId());
                            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
                            // 设置超时
                            dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
@@ -427,13 +531,15 @@
                                }
                            }, userSetting.getPlatformPlayTimeout());
                            // 添加监听
                            MediaServerItem finalMediaServerItem = mediaServerItem;
                            int finalPort = port;
                            boolean finalMediaTransmissionTCP = mediaTransmissionTCP;
                            Boolean finalTcpActive = tcpActive;
                            mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream)->{
                                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(finalMediaServerItem, addressStr, finalPort, ssrc, requesterId,
                                        app, stream, channelId, finalMediaTransmissionTCP);
            // 添加在本机上线的通知
            mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP);
                                if (sendRtpItem == null) {
                                    logger.warn("服务器端口资源不足");
@@ -460,21 +566,43 @@
                                byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                                sendRtpItem.setTransaction(transactionByteArray);
                                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                                sendStreamAck(finalMediaServerItem, sendRtpItem, platform, evt);
                    sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
                } else {
                    // 其他平台内容
                    otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                            mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                }
                            });
                        }
                    }else {
                        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                                gbStream.getApp(), gbStream.getStream(), channelId,
                                mediaTransmissionTCP);
    }
                        if (sendRtpItem == null) {
    /**
     * 来自其他wvp的推流
     */
    private void otherWvpPushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, null, responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                            logger.warn("服务器端口资源不足");
                        try {
                            responseAck(evt, Response.BUSY_HERE);
                        } catch (SipException e) {
                            e.printStackTrace();
                        } catch (InvalidArgumentException e) {
                            e.printStackTrace();
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                            return;
                        }
                    // 收到sendItem
                        if (tcpActive != null) {
                            sendRtpItem.setTcpActive(tcpActive);
                        }
@@ -487,23 +615,45 @@
                        byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                        sendRtpItem.setTransaction(transactionByteArray);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
                    sendStreamAck(responseSendItemMsg.getMediaServerItem(), sendRtpItem, platform, evt);
                }, (wvpResult) -> {
                    try {
                        // 错误
                        if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
                            // 离线
                            // 查询是否在本机上线了
                            StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
                            if (currentStreamPushItem.isStatus()) {
                                // 在线状态
                                pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            } else {
                                // 不在线 拉起
                                notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }
                        }
                    } catch (InvalidArgumentException e) {
                        throw new RuntimeException(e);
                    } catch (ParseException e) {
                        throw new RuntimeException(e);
                    } catch (SipException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
                    try {
                        responseAck(evt, Response.BUSY_HERE);
                    } catch (SipException e) {
            e.printStackTrace();
            logger.warn("sdp解析错误");
                    } catch (InvalidArgumentException e) {
            e.printStackTrace();
        } catch (SdpParseException e) {
            e.printStackTrace();
        } catch (SdpException e) {
                    } catch (ParseException e) {
            e.printStackTrace();
        }
                    return;
                });
    }
    public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt){
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
@@ -41,7 +41,7 @@
    private final Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class);
    public String method = "REGISTER";
    public final String method = "REGISTER";
    @Autowired
    private SipConfig sipConfig;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -40,9 +40,7 @@
public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
    private Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class);
    public static volatile List<String> threadNameList = new ArrayList();
    private final String cmdType = "RecordInfo";
    private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_";
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java
@@ -17,7 +17,7 @@
@Component
public class ByeResponseProcessor extends SIPResponseProcessorAbstract {
    private String method = "BYE";
    private final String method = "BYE";
    @Autowired
    private SipLayer sipLayer;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java
@@ -17,7 +17,7 @@
@Component
public class CancelResponseProcessor extends SIPResponseProcessorAbstract {
    private String method = "CANCEL";
    private final String method = "CANCEL";
    @Autowired
    private SipLayer sipLayer;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
@@ -31,7 +31,7 @@
public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
    private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class);
    private String method = "INVITE";
    private final String method = "INVITE";
    @Autowired
    private SipLayer sipLayer;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -27,7 +27,7 @@
public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
    private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class);
    private String method = "REGISTER";
    private final String method = "REGISTER";
    @Autowired
    private ISIPCommanderForPlatform sipCommanderForPlatform;
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -397,21 +397,22 @@
                            if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                                    || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                    || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
                                streamPushItem = zlmMediaListManager.addPush(item);
                                item.setSeverId(userSetting.getServerId());
                                zlmMediaListManager.addPush(item);
                            }
                            List<GbStream> gbStreams = new ArrayList<>();
                            if (streamPushItem == null || streamPushItem.getGbId() == null) {
                                GbStream gbStream = storager.getGbStream(app, streamId);
                                gbStreams.add(gbStream);
                            }else {
                                if (streamPushItem.getGbId() != null) {
                                    gbStreams.add(streamPushItem);
                                }
                            }
                            if (gbStreams.size() > 0) {
//                            List<GbStream> gbStreams = new ArrayList<>();
//                            if (streamPushItem == null || streamPushItem.getGbId() == null) {
//                                GbStream gbStream = storager.getGbStream(app, streamId);
//                                gbStreams.add(gbStream);
//                            }else {
//                                if (streamPushItem.getGbId() != null) {
//                                    gbStreams.add(streamPushItem);
//                                }
//                            }
//                            if (gbStreams.size() > 0) {
//                                eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON);
                            }
//                            }
                        }else {
                            // 兼容流注销时类型从redis记录获取
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -24,6 +24,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
 * @author lin
 */
@Component
public class ZLMMediaListManager {
@@ -147,7 +150,6 @@
                    }
                }
            }
            //            StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(transform.getApp(), transform.getStream());
            List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId());
            if (gbStreamList != null && gbStreamList.size() == 1) {
                transform.setGbStreamId(gbStreamList.get(0).getGbStreamId());
@@ -162,12 +164,11 @@
            }
            if (transform != null) {
                if (channelOnlineEvents.get(transform.getGbId()) != null)  {
                    channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream());
                    channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId());
                    channelOnlineEvents.remove(transform.getGbId());
                }
            }
        }
        storager.updateMedia(transform);
        return transform;
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -2,6 +2,7 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.slf4j.Logger;
@@ -19,6 +20,9 @@
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private UserSetting userSetting;
    private int[] portRangeArray = new int[2];
@@ -197,6 +201,7 @@
        sendRtpItem.setTcp(tcp);
        sendRtpItem.setApp("rtp");
        sendRtpItem.setLocalPort(localPort);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setMediaServerId(serverItem.getId());
        return sendRtpItem;
    }
@@ -238,6 +243,7 @@
        sendRtpItem.setChannelId(channelId);
        sendRtpItem.setTcp(tcp);
        sendRtpItem.setLocalPort(localPort);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setMediaServerId(serverItem.getId());
        return sendRtpItem;
    }
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
@@ -1,6 +1,9 @@
package com.genersoft.iot.vmp.media.zlm.dto;
/**
 * @author lin
 */
public interface ChannelOnlineEvent {
    void run(String app, String stream);
    void run(String app, String stream, String serverId);
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java
@@ -61,9 +61,14 @@
    private String originUrl;
    /**
     * 服务器id
     * 流媒体服务器id
     */
    private String mediaServerId;
    /**
     * 服务器id
     */
    private String severId;
    /**
     * GMT unix系统时间戳,单位秒
@@ -414,4 +419,12 @@
    public void setStreamInfo(StreamInfo streamInfo) {
        this.streamInfo = streamInfo;
    }
    public String getSeverId() {
        return severId;
    }
    public void setSeverId(String severId) {
        this.severId = severId;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java
@@ -81,6 +81,11 @@
     */
    private String mediaServerId;
    /**
     * 使用的服务ID
     */
    private String serverId;
    public String getVhost() {
        return vhost;
    }
@@ -219,5 +224,13 @@
    public void setMediaServerId(String mediaServerId) {
        this.mediaServerId = mediaServerId;
    }
    public String getServerId() {
        return serverId;
    }
    public void setServerId(String serverId) {
        this.serverId = serverId;
    }
}
src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java
@@ -23,7 +23,6 @@
    private IVideoManagerStorage storager;
    @Scheduled(fixedRate = 30 * 1000)   //每30秒执行一次
    public void execute(){
        List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo();
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -1,7 +1,10 @@
package com.genersoft.iot.vmp.service.bean;
import java.util.stream.Stream;
/**
 * 当上级平台
 * @author lin
 */
public class MessageForPushChannel {
    /**
@@ -45,6 +48,20 @@
     */
    private String mediaServerId;
    public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId,
                                                    String platFormId, String platFormName, String serverId,
                                                    String mediaServerId){
        MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
        messageForPushChannel.setType(type);
        messageForPushChannel.setGbId(gbId);
        messageForPushChannel.setApp(app);
        messageForPushChannel.setStream(stream);
        messageForPushChannel.setMediaServerId(mediaServerId);
        messageForPushChannel.setPlatFormId(platFormId);
        messageForPushChannel.setPlatFormName(platFormName);
        return messageForPushChannel;
    }
    public int getType() {
        return type;
src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java
New file
@@ -0,0 +1,170 @@
package com.genersoft.iot.vmp.service.bean;
/**
 * redis消息:请求下级推送流信息
 * @author lin
 */
public class RequestPushStreamMsg {
    /**
     * 下级服务ID
     */
    private String mediaServerId;
    /**
     * 流ID
     */
    private String app;
    /**
     * 应用名
     */
    private String stream;
    /**
     * 目标IP
     */
    private String ip;
    /**
     * 目标端口
     */
    private int port;
    /**
     * ssrc
     */
    private String ssrc;
    /**
     * 是否使用TCP方式
     */
    private boolean tcp;
    /**
     * 本地使用的端口
     */
    private int srcPort;
    /**
     * 发送时,rtp的pt(uint8_t),不传时默认为96
     */
    private int pt;
    /**
     * 发送时,rtp的负载类型。为true时,负载为ps;为false时,为es;
     */
    private boolean ps;
    /**
     * 是否只有音频
     */
    private boolean onlyAudio;
    public static RequestPushStreamMsg getInstance(String mediaServerId, String app, String stream, String ip, int port, String ssrc,
                                boolean tcp, int srcPort, int pt, boolean ps, boolean onlyAudio) {
        RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg();
        requestPushStreamMsg.setMediaServerId(mediaServerId);
        requestPushStreamMsg.setApp(app);
        requestPushStreamMsg.setStream(stream);
        requestPushStreamMsg.setIp(ip);
        requestPushStreamMsg.setPort(port);
        requestPushStreamMsg.setSsrc(ssrc);
        requestPushStreamMsg.setTcp(tcp);
        requestPushStreamMsg.setSrcPort(srcPort);
        requestPushStreamMsg.setPt(pt);
        requestPushStreamMsg.setPs(ps);
        requestPushStreamMsg.setOnlyAudio(onlyAudio);
        return requestPushStreamMsg;
    }
    public String getMediaServerId() {
        return mediaServerId;
    }
    public void setMediaServerId(String mediaServerId) {
        this.mediaServerId = mediaServerId;
    }
    public String getApp() {
        return app;
    }
    public void setApp(String app) {
        this.app = app;
    }
    public String getStream() {
        return stream;
    }
    public void setStream(String stream) {
        this.stream = stream;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getSsrc() {
        return ssrc;
    }
    public void setSsrc(String ssrc) {
        this.ssrc = ssrc;
    }
    public boolean isTcp() {
        return tcp;
    }
    public void setTcp(boolean tcp) {
        this.tcp = tcp;
    }
    public int getSrcPort() {
        return srcPort;
    }
    public void setSrcPort(int srcPort) {
        this.srcPort = srcPort;
    }
    public int getPt() {
        return pt;
    }
    public void setPt(int pt) {
        this.pt = pt;
    }
    public boolean isPs() {
        return ps;
    }
    public void setPs(boolean ps) {
        this.ps = ps;
    }
    public boolean isOnlyAudio() {
        return onlyAudio;
    }
    public void setOnlyAudio(boolean onlyAudio) {
        this.onlyAudio = onlyAudio;
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java
New file
@@ -0,0 +1,173 @@
package com.genersoft.iot.vmp.service.bean;
/**
 * redis消息:请求下级回复推送信息
 * @author lin
 */
public class RequestSendItemMsg {
    /**
     * 下级服务ID
     */
    private String serverId;
    /**
     * 下级服务ID
     */
    private String mediaServerId;
    /**
     * 流ID
     */
    private String app;
    /**
     * 应用名
     */
    private String stream;
    /**
     * 目标IP
     */
    private String ip;
    /**
     * 目标端口
     */
    private int port;
    /**
     * ssrc
     */
    private String ssrc;
    /**
     * 平台国标编号
     */
    private String platformId;
    /**
     * 平台名称
     */
    private String platformName;
    /**
     * 通道ID
     */
    private String channelId;
    /**
     * 是否使用TCP
     */
    private Boolean isTcp;
    public static RequestSendItemMsg getInstance(String serverId, String mediaServerId, String app, String stream, String ip, int port,
                                                          String ssrc, String platformId, String channelId, Boolean isTcp, String platformName) {
        RequestSendItemMsg requestSendItemMsg = new RequestSendItemMsg();
        requestSendItemMsg.setServerId(serverId);
        requestSendItemMsg.setMediaServerId(mediaServerId);
        requestSendItemMsg.setApp(app);
        requestSendItemMsg.setStream(stream);
        requestSendItemMsg.setIp(ip);
        requestSendItemMsg.setPort(port);
        requestSendItemMsg.setSsrc(ssrc);
        requestSendItemMsg.setPlatformId(platformId);
        requestSendItemMsg.setPlatformName(platformName);
        requestSendItemMsg.setChannelId(channelId);
        requestSendItemMsg.setTcp(isTcp);
        return  requestSendItemMsg;
    }
    public String getServerId() {
        return serverId;
    }
    public void setServerId(String serverId) {
        this.serverId = serverId;
    }
    public String getMediaServerId() {
        return mediaServerId;
    }
    public void setMediaServerId(String mediaServerId) {
        this.mediaServerId = mediaServerId;
    }
    public String getApp() {
        return app;
    }
    public void setApp(String app) {
        this.app = app;
    }
    public String getStream() {
        return stream;
    }
    public void setStream(String stream) {
        this.stream = stream;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getSsrc() {
        return ssrc;
    }
    public void setSsrc(String ssrc) {
        this.ssrc = ssrc;
    }
    public String getPlatformId() {
        return platformId;
    }
    public void setPlatformId(String platformId) {
        this.platformId = platformId;
    }
    public String getPlatformName() {
        return platformName;
    }
    public void setPlatformName(String platformName) {
        this.platformName = platformName;
    }
    public String getChannelId() {
        return channelId;
    }
    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }
    public Boolean getTcp() {
        return isTcp;
    }
    public void setTcp(Boolean tcp) {
        isTcp = tcp;
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java
New file
@@ -0,0 +1,31 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
/**
 * redis消息:下级回复推送信息
 * @author lin
 */
public class ResponseSendItemMsg {
    private SendRtpItem sendRtpItem;
    private MediaServerItem mediaServerItem;
    public SendRtpItem getSendRtpItem() {
        return sendRtpItem;
    }
    public void setSendRtpItem(SendRtpItem sendRtpItem) {
        this.sendRtpItem = sendRtpItem;
    }
    public MediaServerItem getMediaServerItem() {
        return mediaServerItem;
    }
    public void setMediaServerItem(MediaServerItem mediaServerItem) {
        this.mediaServerItem = mediaServerItem;
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java
New file
@@ -0,0 +1,116 @@
package com.genersoft.iot.vmp.service.bean;
/**
 * @author lin
 */
public class WvpRedisMsg {
    public static WvpRedisMsg getInstance(String fromId, String toId, String type, String cmd, String serial, String content){
        WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
        wvpRedisMsg.setFromId(fromId);
        wvpRedisMsg.setToId(toId);
        wvpRedisMsg.setType(type);
        wvpRedisMsg.setCmd(cmd);
        wvpRedisMsg.setSerial(serial);
        wvpRedisMsg.setContent(content);
        return wvpRedisMsg;
    }
    private String fromId;
    private String toId;
    /**
     * req 请求, res 回复
     */
    private String type;
    private String cmd;
    /**
     * 消息的ID
     */
    private String serial;
    private Object content;
    private final static String requestTag = "req";
    private final static String responseTag = "res";
    public static WvpRedisMsg getRequestInstance(String fromId, String toId, String cmd, String serial, Object content) {
        WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
        wvpRedisMsg.setType(requestTag);
        wvpRedisMsg.setFromId(fromId);
        wvpRedisMsg.setToId(toId);
        wvpRedisMsg.setCmd(cmd);
        wvpRedisMsg.setSerial(serial);
        wvpRedisMsg.setContent(content);
        return wvpRedisMsg;
    }
    public static WvpRedisMsg getResponseInstance() {
        WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
        wvpRedisMsg.setType(responseTag);
        return wvpRedisMsg;
    }
    public static WvpRedisMsg getResponseInstance(String fromId, String toId, String cmd, String serial, Object content) {
        WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
        wvpRedisMsg.setType(responseTag);
        wvpRedisMsg.setFromId(fromId);
        wvpRedisMsg.setToId(toId);
        wvpRedisMsg.setCmd(cmd);
        wvpRedisMsg.setSerial(serial);
        wvpRedisMsg.setContent(content);
        return wvpRedisMsg;
    }
    public static boolean isRequest(WvpRedisMsg wvpRedisMsg) {
        return requestTag.equals(wvpRedisMsg.getType());
    }
    public String getSerial() {
        return serial;
    }
    public void setSerial(String serial) {
        this.serial = serial;
    }
    public String getFromId() {
        return fromId;
    }
    public void setFromId(String fromId) {
        this.fromId = fromId;
    }
    public String getToId() {
        return toId;
    }
    public void setToId(String toId) {
        this.toId = toId;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
    public String getCmd() {
        return cmd;
    }
    public void setCmd(String cmd) {
        this.cmd = cmd;
    }
    public Object getContent() {
        return content;
    }
    public void setContent(Object content) {
        this.content = content;
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
New file
@@ -0,0 +1,12 @@
package com.genersoft.iot.vmp.service.bean;
/**
 * @author lin
 */
public class WvpRedisMsgCmd {
    public static final String GET_SEND_ITEM = "GetSendItem";
    public static final String REQUEST_PUSH_STREAM = "RequestPushStream";
}
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
New file
@@ -0,0 +1,377 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 监听下级发送推送信息,并发送国标推流消息上级
 * @author lin
 */
@Component
public class RedisGbPlayMsgListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
    public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
    /**
     * 流媒体不存在的错误玛
     */
    public static final  int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
    /**
     * 离线的错误玛
     */
    public static final  int ERROR_CODE_OFFLINE = -2;
    /**
     * 超时的错误玛
     */
    public static final  int ERROR_CODE_TIMEOUT = -3;
    private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
    private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
    private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private RedisUtil redis;
    @Autowired
    private ZLMMediaListManager zlmMediaListManager;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private ZLMMediaListManager mediaListManager;
    @Autowired
    private ZLMHttpHookSubscribe subscribe;
    public interface PlayMsgCallback{
        void handler(ResponseSendItemMsg responseSendItemMsg);
    }
    public interface PlayMsgCallbackForStartSendRtpStream{
        void handler(JSONObject jsonObject);
    }
    public interface PlayMsgErrorCallback{
        void handler(WVPResult wvpResult);
    }
    @Override
    public void onMessage(Message message, byte[] bytes) {
        JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class);
        WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
        if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
            return;
        }
        if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
            logger.info("[收到REDIS通知] 请求: {}", new String(message.getBody()));
            switch (wvpRedisMsg.getCmd()){
                case WvpRedisMsgCmd.GET_SEND_ITEM:
                    RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class);
                    requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                    break;
                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                    RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);;
                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                    break;
                default:
                    break;
            }
        }else {
            logger.info("[收到REDIS通知] 回复: {}", new String(message.getBody()));
            switch (wvpRedisMsg.getCmd()){
                case WvpRedisMsgCmd.GET_SEND_ITEM:
                    WVPResult content  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
                    String key = wvpRedisMsg.getSerial();
                    switch (content.getCode()) {
                        case 0:
                            ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);
                            PlayMsgCallback playMsgCallback = callbacks.get(key);
                            if (playMsgCallback != null) {
                                callbacksForError.remove(key);
                                playMsgCallback.handler(responseSendItemMsg);
                            }
                            break;
                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
                        case ERROR_CODE_OFFLINE:
                        case ERROR_CODE_TIMEOUT:
                            PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
                            if (errorCallback != null) {
                                callbacks.remove(key);
                                errorCallback.handler(content);
                            }
                            break;
                        default:
                            break;
                    }
                    break;
                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                    WVPResult wvpResult  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
                    String serial = wvpRedisMsg.getSerial();
                    switch (wvpResult.getCode()) {
                        case 0:
                            JSONObject jsonObject = (JSONObject)wvpResult.getData();
                            PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
                            if (playMsgCallback != null) {
                                callbacksForError.remove(serial);
                                playMsgCallback.handler(jsonObject);
                            }
                            break;
                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
                        case ERROR_CODE_OFFLINE:
                        case ERROR_CODE_TIMEOUT:
                            PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
                            if (errorCallback != null) {
                                callbacks.remove(serial);
                                errorCallback.handler(wvpResult);
                            }
                            break;
                        default:
                            break;
                    }
                    break;
                default:
                    break;
            }
        }
    }
    /**
     * 处理收到的请求推流的请求
     */
    private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
        MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
        if (mediaInfo == null) {
            // TODO 回复错误
            return;
        }
        String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",requestPushStreamMsg.getApp());
        param.put("stream",requestPushStreamMsg.getStream());
        param.put("ssrc", requestPushStreamMsg.getSsrc());
        param.put("dst_url",requestPushStreamMsg.getIp());
        param.put("dst_port", requestPushStreamMsg.getPort());
        param.put("is_udp", is_Udp);
        param.put("src_port", requestPushStreamMsg.getSrcPort());
        param.put("pt", requestPushStreamMsg.getPt());
        param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
        param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
        JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
        // 回复消息
        responsePushStream(jsonObject, fromId, serial);
    }
    private void responsePushStream(JSONObject content, String toId, String serial) {
        WVPResult<JSONObject> result = new WVPResult<>();
        result.setCode(0);
        result.setData(content);
        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result);
        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
        redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    /**
     * 处理收到的请求sendItem的请求
     */
    private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
        MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId());
            WVPResult<SendRtpItem> result = new WVPResult<>();
            result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
            result.setMsg("流媒体不存在");
            WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
                    WvpRedisMsgCmd.GET_SEND_ITEM, serial, result);
            JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
            redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
            return;
        }
        // 确定流是否在线
        boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
        if (streamReady) {
            logger.info("[回复推流信息]  {}/{}", content.getApp(), content.getStream());
            responseSendItem(mediaServerItem, content, toId, serial);
        }else {
            // 流已经离线
            // 发送redis消息以使设备上线
            logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",content.getApp(), content.getStream());
            String taskKey = UUID.randomUUID().toString();
            // 设置超时
            dynamicTask.startDelay(taskKey, ()->{
                logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream());
                WVPResult<SendRtpItem> result = new WVPResult<>();
                result.setCode(ERROR_CODE_TIMEOUT);
                WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
                        userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
                );
                JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
                redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
            }, userSetting.getPlatformPlayTimeout());
            // 添加订阅
            JSONObject subscribeKey = new JSONObject();
            subscribeKey.put("app", content.getApp());
            subscribeKey.put("stream", content.getStream());
            subscribeKey.put("regist", true);
            subscribeKey.put("schema", "rtmp");
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                        dynamicTask.stop(taskKey);
                        responseSendItem(mediaServerItem, content, toId, serial);
                    });
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
                    content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
                    content.getMediaServerId());
            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
        }
    }
    /**
     * 将获取到的sendItem发送出去
     */
    private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                content.getPort(), content.getSsrc(), content.getPlatformId(),
                content.getApp(), content.getStream(), content.getChannelId(),
                content.getTcp());
        WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
        result.setCode(0);
        ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
        responseSendItemMsg.setSendRtpItem(sendRtpItem);
        responseSendItemMsg.setMediaServerItem(mediaServerItem);
        result.setData(responseSendItemMsg);
        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
                userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
        );
        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
        redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    /**
     * 发送消息要求下级生成推流信息
     * @param serverId 下级服务ID
     * @param app 应用名
     * @param stream 流ID
     * @param ip 目标IP
     * @param port 目标端口
     * @param ssrc  ssrc
     * @param platformId 平台国标编号
     * @param channelId 通道ID
     * @param isTcp 是否使用TCP
     * @param callback 得到信息的回调
     */
    public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
                        String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
        RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName);
        requestSendItemMsg.setServerId(serverId);
        String key = UUID.randomUUID().toString();
        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
                key, requestSendItemMsg);
        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject);
        callbacks.put(key, callback);
        callbacksForError.put(key, errorCallback);
        dynamicTask.startDelay(key, ()->{
            callbacks.remove(key);
            callbacksForError.remove(key);
            WVPResult<Object> wvpResult = new WVPResult<>();
            wvpResult.setCode(ERROR_CODE_TIMEOUT);
            wvpResult.setMsg("timeout");
            errorCallback.handler(wvpResult);
        }, userSetting.getPlatformPlayTimeout());
        redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    /**
     * 发送请求推流的消息
     * @param param 推流参数
     * @param callback 回调
     */
    public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
        String key = UUID.randomUUID().toString();
        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param);
        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject);
        dynamicTask.startDelay(key, ()->{
            callbacksForStartSendRtpStream.remove(key);
            callbacksForError.remove(key);
        }, userSetting.getPlatformPlayTimeout());
        callbacksForStartSendRtpStream.put(key, callback);
        callbacksForError.put(key, (wvpResult)->{
            logger.info("[REDIS 请求其他平台推流] 失败: {}", wvpResult.getMsg());
            callbacksForStartSendRtpStream.remove(key);
            callbacksForError.remove(key);
        });
        redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
File was renamed from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -10,17 +11,23 @@
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
 * 接收来自redis的GPS更新通知
 * @author lin
 */
@Component
public class RedisGPSMsgListener implements MessageListener {
public class RedisGpsMsgListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisGPSMsgListener.class);
    private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("收到来自REDIS的GPS通知: {}", new String(message.getBody()));
    public void onMessage(@NotNull Message message, byte[] bytes) {
        if (logger.isDebugEnabled()) {
            logger.debug("收到来自REDIS的GPS通知: {}", new String(message.getBody()));
        }
        GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
        redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
    }
src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
New file
@@ -0,0 +1,83 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
 * @author lin
 */
@Component
public class RedisStreamMsgListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
    @Autowired
    private ISIPCommander commander;
    @Autowired
    private ISIPCommanderForPlatform commanderForPlatform;
    @Autowired
    private IVideoManagerStorage storage;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private ZLMMediaListManager zlmMediaListManager;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class);
        if (steamMsgJson == null) {
            logger.warn("[REDIS的ALARM通知]消息解析失败");
            return;
        }
        String serverId = steamMsgJson.getString("serverId");
        if (userSetting.getServerId().equals(serverId)) {
            // 自己发送的消息忽略即可
            return;
        }
        logger.info("[REDIS通知] 流变化: {}", new String(message.getBody()));
        String app = steamMsgJson.getString("app");
        String stream = steamMsgJson.getString("stream");
        boolean register = steamMsgJson.getBoolean("register");
        String mediaServerId = steamMsgJson.getString("mediaServerId");
        MediaItem mediaItem = new MediaItem();
        mediaItem.setSeverId(serverId);
        mediaItem.setApp(app);
        mediaItem.setStream(stream);
        mediaItem.setRegist(register);
        mediaItem.setMediaServerId(mediaServerId);
        mediaItem.setCreateStamp(System.currentTimeMillis()/1000);
        mediaItem.setAliveSecond(0L);
        mediaItem.setTotalReaderCount("0");
        mediaItem.setOriginType(0);
        mediaItem.setOriginTypeStr("0");
        mediaItem.setOriginTypeStr("unknown");
        zlmMediaListManager.addPush(mediaItem);
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -107,6 +107,7 @@
        streamPushItem.setStatus(true);
        streamPushItem.setStreamType("push");
        streamPushItem.setVhost(item.getVhost());
        streamPushItem.setServerId(item.getSeverId());
        return streamPushItem;
    }
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
@@ -357,6 +357,15 @@
    /**
     * 获取但个推流
     * @param app
     * @param stream
     * @return
     */
    StreamPushItem getMedia(String app, String stream);
    /**
     * 清空推流列表
     */
    void clearMediaList();
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -14,9 +14,9 @@
public interface StreamPushMapper {
    @Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
            "createStamp, aliveSecond, mediaServerId) VALUES" +
            "createStamp, aliveSecond, mediaServerId, serverId) VALUES" +
            "('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " +
            "'${createStamp}', '${aliveSecond}', '${mediaServerId}' )")
            "'${createStamp}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' )")
    int add(StreamPushItem streamPushItem);
    @Update("UPDATE stream_push " +
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -587,8 +587,8 @@
        String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_*";
        List<GPSMsgInfo> result = new ArrayList<>();
        List<Object> keys = redis.scan(scanKey);
        for (int i = 0; i < keys.size(); i++) {
            String key = (String) keys.get(i);
        for (Object o : keys) {
            String key = (String) o;
            GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key);
            if (!gpsMsgInfo.isStored()) { // 只取没有存过得
                result.add((GPSMsgInfo)redis.get(key));
@@ -667,7 +667,7 @@
    @Override
    public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
        logger.info("[redis 推流被请求通知] {}: {}-{}", key, msg.getApp(), msg.getStream());
        logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream());
        redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
    }
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -885,6 +885,11 @@
    }
    @Override
    public StreamPushItem getMedia(String app, String stream) {
        return streamPushMapper.selectOne(app, stream);
    }
    @Override
    public void clearMediaList() {
        streamPushMapper.clear();
    }
web_src/src/components/dialog/rtcPlayer.vue
@@ -7,11 +7,11 @@
</template>
<script>
let webrtcPlayer = null;
export default {
    name: 'rtcPlayer',
    data() {
        return {
            webrtcPlayer: null,
            timer: null
        };
    },
@@ -35,7 +35,7 @@
    },
    methods: {
        play: function (url) {
            this.webrtcPlayer = new ZLMRTCClient.Endpoint({
            webrtcPlayer = new ZLMRTCClient.Endpoint({
                element: document.getElementById('webRtcPlayerBox'),// video 标签
                debug: true,// 是否打印日志
                zlmsdpUrl: url,//流地址
@@ -45,17 +45,17 @@
                videoEnable: false,
                recvOnly: true,
            })
            this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错
            webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错
                console.error('ICE 协商出错')
                this.eventcallbacK("ICE ERROR", "ICE 协商出错")
            });
            this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放
            webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放
                console.error('播放成功',e.streams)
                this.eventcallbacK("playing", "播放成功")
            });
            this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败
            webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败
                console.error('offer anwser 交换失败',e)
                this.eventcallbacK("OFFER ANSWER ERROR ", "offer anwser 交换失败")
                if (e.code ==-400 && e.msg=="流不存在"){
@@ -68,7 +68,7 @@
                }
            });
            this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 获取到了本地流
            webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 获取到了本地流
                // document.getElementById('selfVideo').srcObject=s;
                this.eventcallbacK("LOCAL STREAM", "获取到了本地流")
@@ -76,9 +76,9 @@
        },
        pause: function () {
            if (this.webrtcPlayer != null) {
                this.webrtcPlayer.close();
                this.webrtcPlayer = null;
            if (webrtcPlayer != null) {
                webrtcPlayer.close();
                webrtcPlayer = null;
            }
        },