648540858
2023-06-27 f961515317a33fe965287ca5c978b85e9ce1abcc
合并主线
7个文件已修改
99 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -56,7 +56,7 @@
        //via
        ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
        ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(),
                Integer.parseInt(parentPlatform.getDevicePort()), parentPlatform.getTransport(), SipUtils.getNewViaTag());
                parentPlatform.getDevicePort(), parentPlatform.getTransport(), SipUtils.getNewViaTag());
        viaHeader.setRPort();
        viaHeaders.add(viaHeader);
        //from
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -6,9 +6,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@@ -624,9 +622,9 @@
        logger.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort());
        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
        subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
        subscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, hookParam) -> {
            if (event != null) {
                event.response(mediaServerItemInUse, json);
                event.response(mediaServerItemInUse, hookParam);
                subscribe.removeSubscribe(hookSubscribeForStreamChange);
            }
        });
@@ -634,9 +632,9 @@
        CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
        callIdHeader.setCallId(callId);
        HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream,  null, mediaServerItem.getId());
        subscribe.addSubscribe(hookSubscribeForStreamPush, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
        subscribe.addSubscribe(hookSubscribeForStreamPush, (mediaServerItemInUse, hookParam) -> {
            if (eventForPush != null) {
                eventForPush.response(mediaServerItemInUse, json);
                eventForPush.response(mediaServerItemInUse, hookParam);
            }
        });
        //
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -19,6 +19,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -899,9 +900,9 @@
        logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
            if (event != null) {
                event.response(mediaServerItemInUse, json);
                event.response(mediaServerItemInUse, hookParam);
                subscribe.removeSubscribe(hookSubscribe);
            }
        });
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
@@ -10,6 +10,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
@@ -127,10 +128,9 @@
                // 消息发送成功, 向上级发送invite,获取推流
                try {
                    platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad,  (mediaServerItem, response)->{
                    platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad,  (mediaServerItem, hookParam)->{
                        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
                        // 上级平台推流成功
                        String app = response.getString("app");
                        String stream = response.getString("stream");
                        AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId);
                        if (broadcastCatch != null ) {
                            if (playService.audioBroadcastInUse(device, targetId)) {
@@ -138,24 +138,24 @@
                                        platform.getServerGBId(), deviceChannel.getChannelId());
                                //  查看语音通道已经建立且已经占用 回复BYE
                                try {
                                    platformService.stopBroadcast(platform, deviceChannel.getChannelId(), stream);
                                    platformService.stopBroadcast(platform, deviceChannel.getChannelId(), streamChangedHookParam.getStream());
                                } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException |
                                         SipException e) {
                                    logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}, channel: {}", platform.getServerGBId(), deviceChannel.getChannelId());
                                }
                            }else {
                                // 查看语音通道已经建立但是未占用
                                broadcastCatch.setApp(app);
                                broadcastCatch.setStream(stream);
                                broadcastCatch.setApp(streamChangedHookParam.getApp());
                                broadcastCatch.setStream(streamChangedHookParam.getStream());
                                broadcastCatch.setMediaServerItem(mediaServerItem);
                                audioBroadcastManager.update(broadcastCatch);
                                // 推流到设备
                                SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, stream, null);
                                SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, streamChangedHookParam.getStream(), null);
                                if (sendRtpItem == null) {
                                    logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, stream);
                                    logger.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, stream);
                                    logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream());
                                    logger.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream());
                                    try {
                                        playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> {
                                        playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> {
                                            logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId);
                                        });
                                    } catch (SipException | InvalidArgumentException | ParseException e) {
@@ -173,7 +173,7 @@
                            }
                        }else {
                            try {
                                playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> {
                                playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> {
                                    logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId);
                                });
                            } catch (SipException | InvalidArgumentException | ParseException e) {
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -7,6 +7,7 @@
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@@ -28,7 +29,7 @@
              ErrorCallback<Object> callback);
    SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback);
    StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId);
    StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId);
    MediaServerItem getNewMediaServerItem(Device device);
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -16,6 +15,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
@@ -442,10 +442,11 @@
                    inviteStreamService.removeInviteInfo(inviteInfo);
                }else {
                    // 流确实尚在推流,直接回调结果
                    JSONObject json = new JSONObject();
                    json.put("app", inviteInfo.getStreamInfo().getApp());
                    json.put("stream", inviteInfo.getStreamInfo().getStream());
                    hookEvent.response(mediaServerItemForStreamInfo, json);
                    OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam();
                    hookParam.setApp(inviteInfo.getStreamInfo().getApp());
                    hookParam.setStream(inviteInfo.getStreamInfo().getStream());
                    hookEvent.response(mediaServerItemForStreamInfo, hookParam);
                    return;
                }
            }
@@ -498,14 +499,14 @@
                }
            }
        }, userSetting.getPlayTimeout());
        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{
            logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
            dynamicTask.stop(timeOutTaskKey);
            // hook响应
            playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);
            playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId);
            // 收到流
            if (hookEvent != null) {
                hookEvent.response(mediaServerItem, response);
                hookEvent.response(mediaServerItem, hookParam);
            }
        }, event -> {
            // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc
@@ -524,30 +525,20 @@
                logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
                if (!mediaServerItem.isRtpEnable()) {
                    logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                    if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) {
                        // ssrc 不可用
                        // 释放ssrc
                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                        streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
                        event.msg = "下级自定义了ssrc,但是此ssrc不可用";
                        event.statusCode = 400;
                        errorEvent.response(event);
                        return;
                    }
                    // 释放ssrc
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    // 单端口模式streamId也有变化,需要重新设置监听
                    if (!mediaServerItem.isRtpEnable()) {
                        // 添加订阅
                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                        subscribe.removeSubscribe(hookSubscribe);
                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                        subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
                            dynamicTask.stop(timeOutTaskKey);
                            // hook响应
                            playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId);
                            hookEvent.response(mediaServerItemInUse, response);
                            playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
                            hookEvent.response(mediaServerItemInUse, hookParam);
                        });
                    }
                    // 关闭rtp server
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -293,12 +293,12 @@
        // 查看设备是否已经在推流
        try {
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + response.toJSONString());
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + hookParam);
                dynamicTask.stop(timeOutTaskKey);
                // TODO 暂不做处理
            }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
                logger.info("[语音对讲] 设备开始推流: " + json.toJSONString());
            }, (mediaServerItemInuse, hookParam) -> {
                logger.info("[语音对讲] 设备开始推流: " + hookParam);
                dynamicTask.stop(timeOutTaskKey);
            }, (event) -> {
@@ -617,10 +617,10 @@
    }
    @Override
    public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
    public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
        Device device = redisCatchStorage.getDevice(deviceId);
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
        if (streamInfo != null) {
            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
            if (deviceChannel != null) {
@@ -1571,7 +1571,7 @@
            }
        }
        talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> {
        talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
            logger.info("[语音对讲] 收到设备发来的流");
        }, eventResult -> {
            logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);