648540858
2024-04-07 7d530099878dbcf474182bbbfc694ecfe74c9fbd
优化推流通知
10个文件已修改
223 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java 98 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -305,4 +305,33 @@
    public void setReceiveStream(String receiveStream) {
        this.receiveStream = receiveStream;
    }
    @Override
    public String toString() {
        return "SendRtpItem{" +
                "ip='" + ip + '\'' +
                ", port=" + port +
                ", ssrc='" + ssrc + '\'' +
                ", platformId='" + platformId + '\'' +
                ", deviceId='" + deviceId + '\'' +
                ", app='" + app + '\'' +
                ", channelId='" + channelId + '\'' +
                ", status=" + status +
                ", stream='" + stream + '\'' +
                ", tcp=" + tcp +
                ", tcpActive=" + tcpActive +
                ", localPort=" + localPort +
                ", mediaServerId='" + mediaServerId + '\'' +
                ", serverId='" + serverId + '\'' +
                ", CallId='" + CallId + '\'' +
                ", fromTag='" + fromTag + '\'' +
                ", toTag='" + toTag + '\'' +
                ", pt=" + pt +
                ", usePs=" + usePs +
                ", onlyAudio=" + onlyAudio +
                ", rtcp=" + rtcp +
                ", playType=" + playType +
                ", receiveStream='" + receiveStream + '\'' +
                '}';
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -116,7 +116,7 @@
        if (parentPlatform != null) {
            Map<String, Object> param = getSendRtpParam(sendRtpItem);
            if (mediaInfo == null) {
            if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                        sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
                        sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -575,14 +575,20 @@
                    }
                    if ("push".equals(gbStream.getStreamType())) {
                        if (streamPushItem != null && streamPushItem.isPushIng()) {
                            // 推流状态
                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        } else {
                            // 未推流 拉起
                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        if (streamPushItem != null) {
                            // 从redis查询是否正在接收这个推流
                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            if (pushListItem != null) {
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                // 推流状态
                                pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }else {
                                // 未推流 拉起
                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }
                        }
                    } else if ("proxy".equals(gbStream.getStreamType())) {
                        if (null != proxyByAppAndStream) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -509,32 +509,43 @@
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
                    if (!sendRtpItems.isEmpty()) {
                        for (SendRtpItem sendRtpItem : sendRtpItems) {
                            if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
                                String platformId = sendRtpItem.getPlatformId();
                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
                                Device device = deviceService.getDevice(platformId);
                            if (sendRtpItem == null) {
                                continue;
                            }
                                try {
                                    if (platform != null) {
                                        commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
                                        redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
                                    } else {
                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
                                        if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                                || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
                                            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                            if (audioBroadcastCatch != null) {
                                                // 来自上级平台的停止对讲
                                                logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                            if (sendRtpItem.getApp().equals(param.getApp())) {
                                logger.info(sendRtpItem.toString());
                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                                    // 通知其他wvp停止发流
//                                    redisCatchStorage.sendRtp
                                }else {
                                    String platformId = sendRtpItem.getPlatformId();
                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
                                    Device device = deviceService.getDevice(platformId);
                                    try {
                                        if (platform != null) {
                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
                                        } else {
                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                if (audioBroadcastCatch != null) {
                                                    // 来自上级平台的停止对讲
                                                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                }
                                            }
                                        }
                                    } catch (SipException | InvalidArgumentException | ParseException |
                                             SsrcTransactionNotFoundException e) {
                                        logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                                    }
                                } catch (SipException | InvalidArgumentException | ParseException |
                                         SsrcTransactionNotFoundException e) {
                                    logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                                }
                            }
                        }
                    }
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1170,7 +1170,7 @@
            dynamicTask.startDelay(key, ()->{
                logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId);
                stopAudioBroadcast(device.getDeviceId(), channelId);
            }, 2000);
            }, 10*1000);
        }, eventResultForError -> {
            // 发送失败
            logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -8,7 +8,6 @@
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
@@ -25,7 +24,6 @@
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -333,8 +331,6 @@
            result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
                    param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
        }
        System.out.println("addStreamProxyToZlm====");
        System.out.println(result);
        if (result != null && result.getInteger("code") == 0) {
            JSONObject data = result.getJSONObject("data");
            if (data == null) {
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
@@ -1,11 +1,7 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,52 +37,52 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
                        if (steamMsgJson == null) {
                            logger.warn("[收到redis 流变化]消息解析失败");
                            continue;
                        }
                        String serverId = steamMsgJson.getString("serverId");
                        if (userSetting.getServerId().equals(serverId)) {
                            // 自己发送的消息忽略即可
                            continue;
                        }
                        logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
                        String app = steamMsgJson.getString("app");
                        String stream = steamMsgJson.getString("stream");
                        boolean register = steamMsgJson.getBoolean("register");
                        String mediaServerId = steamMsgJson.getString("mediaServerId");
                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
                        onStreamChangedHookParam.setSeverId(serverId);
                        onStreamChangedHookParam.setApp(app);
                        onStreamChangedHookParam.setStream(stream);
                        onStreamChangedHookParam.setRegist(register);
                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
                        onStreamChangedHookParam.setAliveSecond(0L);
                        onStreamChangedHookParam.setTotalReaderCount("0");
                        onStreamChangedHookParam.setOriginType(0);
                        onStreamChangedHookParam.setOriginTypeStr("0");
                        onStreamChangedHookParam.setOriginTypeStr("unknown");
                        if (register) {
                            zlmMediaListManager.addPush(onStreamChangedHookParam);
                        }else {
                            zlmMediaListManager.removeMedia(app, stream);
                        }
                    }catch (Exception e) {
                        logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
                        logger.error("[REDIS消息-流变化] 异常内容: ", e);
                    }
                }
            });
        }
//        boolean isEmpty = taskQueue.isEmpty();
//        taskQueue.offer(message);
//        if (isEmpty) {
//            taskExecutor.execute(() -> {
//                while (!taskQueue.isEmpty()) {
//                    Message msg = taskQueue.poll();
//                    try {
//                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
//                        if (steamMsgJson == null) {
//                            logger.warn("[收到redis 流变化]消息解析失败");
//                            continue;
//                        }
//                        String serverId = steamMsgJson.getString("serverId");
//
//                        if (userSetting.getServerId().equals(serverId)) {
//                            // 自己发送的消息忽略即可
//                            continue;
//                        }
//                        logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
//                        String app = steamMsgJson.getString("app");
//                        String stream = steamMsgJson.getString("stream");
//                        boolean register = steamMsgJson.getBoolean("register");
//                        String mediaServerId = steamMsgJson.getString("mediaServerId");
//                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
//                        onStreamChangedHookParam.setSeverId(serverId);
//                        onStreamChangedHookParam.setApp(app);
//                        onStreamChangedHookParam.setStream(stream);
//                        onStreamChangedHookParam.setRegist(register);
//                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
//                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
//                        onStreamChangedHookParam.setAliveSecond(0L);
//                        onStreamChangedHookParam.setTotalReaderCount("0");
//                        onStreamChangedHookParam.setOriginType(0);
//                        onStreamChangedHookParam.setOriginTypeStr("0");
//                        onStreamChangedHookParam.setOriginTypeStr("unknown");
//                        if (register) {
//                            zlmMediaListManager.addPush(onStreamChangedHookParam);
//                        }else {
//                            zlmMediaListManager.removeMedia(app, stream);
//                        }
//                    }catch (Exception e) {
//                        logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
//                        logger.error("[REDIS消息-流变化] 异常内容: ", e);
//                    }
//                }
//            });
//        }
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -211,5 +211,7 @@
    void addPushListItem(String app, String stream, OnStreamChangedHookParam param);
    OnStreamChangedHookParam getPushListItem(String app, String stream);
    void removePushListItem(String app, String stream, String mediaServerId);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -657,6 +657,12 @@
    }
    @Override
    public OnStreamChangedHookParam getPushListItem(String app, String stream) {
        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
        return (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
    }
    @Override
    public void removePushListItem(String app, String stream, String mediaServerId) {
        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
        OnStreamChangedHookParam param = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java
@@ -1,12 +1,8 @@
package com.genersoft.iot.vmp.vmanager.cloudRecord;
import com.alibaba.fastjson2.JSONArray;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.ICloudRecordService;
import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -22,7 +18,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;