648540858
2023-06-29 f62bf7b2c6239f2c67f5d9019f8302c8d441f870
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
package com.genersoft.iot.vmp.service.redisMsg;
 
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
 
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
 
 
/**
 * 监听下级发送推送信息,并发送国标推流消息上级
 * @author lin
 */
@Component
public class RedisGbPlayMsgListener implements MessageListener {
 
    private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
 
    public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
 
    /**
     * 流媒体不存在的错误玛
     */
    public static final  int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
 
    /**
     * 离线的错误玛
     */
    public static final  int ERROR_CODE_OFFLINE = -2;
 
    /**
     * 超时的错误玛
     */
    public static final  int ERROR_CODE_TIMEOUT = -3;
 
    private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
    private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
    private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();
 
    @Autowired
    private UserSetting userSetting;
 
 
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
 
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
 
    @Autowired
    private IMediaServerService mediaServerService;
 
 
    @Autowired
    private DynamicTask dynamicTask;
 
 
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
 
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
 
 
    public interface PlayMsgCallback{
        void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
    }
 
    public interface PlayMsgCallbackForStartSendRtpStream{
        void handler(JSONObject jsonObject);
    }
 
    public interface PlayMsgErrorCallback{
        void handler(WVPResult wvpResult);
    }
 
    @Override
    public void onMessage(Message message, byte[] bytes) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
                        WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON);
                        if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
                            continue;
                        }
                        if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
                            logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody()));
 
                            switch (wvpRedisMsg.getCmd()){
                                case WvpRedisMsgCmd.GET_SEND_ITEM:
                                    RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent());
                                    requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
 
                                    break;
                                default:
                                    break;
                            }
 
                        }else {
                            logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody()));
                            switch (wvpRedisMsg.getCmd()){
                                case WvpRedisMsgCmd.GET_SEND_ITEM:
 
                                   WVPResult content  = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
 
                                    String key = wvpRedisMsg.getSerial();
                                    switch (content.getCode()) {
                                        case 0:
                                           ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData());
                                            PlayMsgCallback playMsgCallback = callbacks.get(key);
                                            if (playMsgCallback != null) {
                                                callbacksForError.remove(key);
                                                try {
                                                    playMsgCallback.handler(responseSendItemMsg);
                                                } catch (ParseException e) {
                                                    logger.error("[REDIS消息处理异常] ", e);
                                                }
                                            }
                                            break;
                                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
                                        case ERROR_CODE_OFFLINE:
                                        case ERROR_CODE_TIMEOUT:
                                            PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
                                            if (errorCallback != null) {
                                                callbacks.remove(key);
                                                errorCallback.handler(content);
                                            }
                                            break;
                                        default:
                                            break;
                                    }
                                    break;
                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                    WVPResult wvpResult  = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
                                    String serial = wvpRedisMsg.getSerial();
                                    switch (wvpResult.getCode()) {
                                        case 0:
                                            JSONObject jsonObject = (JSONObject)wvpResult.getData();
                                            PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
                                            if (playMsgCallback != null) {
                                                callbacksForError.remove(serial);
                                                playMsgCallback.handler(jsonObject);
                                            }
                                            break;
                                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
                                        case ERROR_CODE_OFFLINE:
                                        case ERROR_CODE_TIMEOUT:
                                            PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
                                            if (errorCallback != null) {
                                                callbacks.remove(serial);
                                                errorCallback.handler(wvpResult);
                                            }
                                            break;
                                        default:
                                            break;
                                    }
                                    break;
                                default:
                                    break;
                            }
 
                        }
                    }catch (Exception e) {
                        logger.warn("[RedisGbPlayMsg] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
                        logger.error("[RedisGbPlayMsg] 异常内容: ", e);
                    }
                }
            });
        }
    }
 
    /**
     * 处理收到的请求推流的请求
     */
    private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
        MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
        if (mediaInfo == null) {
            // TODO 回复错误
            return;
        }
        String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",requestPushStreamMsg.getApp());
        param.put("stream",requestPushStreamMsg.getStream());
        param.put("ssrc", requestPushStreamMsg.getSsrc());
        param.put("dst_url",requestPushStreamMsg.getIp());
        param.put("dst_port", requestPushStreamMsg.getPort());
        param.put("is_udp", is_Udp);
        param.put("src_port", requestPushStreamMsg.getSrcPort());
        param.put("pt", requestPushStreamMsg.getPt());
        param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
        param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
        JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
        // 回复消息
        responsePushStream(jsonObject, fromId, serial);
    }
 
    private void responsePushStream(JSONObject content, String toId, String serial) {
 
        WVPResult<JSONObject> result = new WVPResult<>();
        result.setCode(0);
        result.setData(content);
 
        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result);
        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
 
    /**
     * 处理收到的请求sendItem的请求
     */
    private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
        MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId());
 
            WVPResult<SendRtpItem> result = new WVPResult<>();
            result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
            result.setMsg("流媒体不存在");
 
            WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
                    WvpRedisMsgCmd.GET_SEND_ITEM, serial, result);
 
            JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
            redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
            return;
        }
        // 确定流是否在线
        Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
        if (streamReady != null && streamReady) {
            logger.info("[回复推流信息]  {}/{}", content.getApp(), content.getStream());
            responseSendItem(mediaServerItem, content, toId, serial);
        }else {
            // 流已经离线
            // 发送redis消息以使设备上线
            logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",content.getApp(), content.getStream());
 
            String taskKey = UUID.randomUUID().toString();
            // 设置超时
            dynamicTask.startDelay(taskKey, ()->{
                logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream());
                WVPResult<SendRtpItem> result = new WVPResult<>();
                result.setCode(ERROR_CODE_TIMEOUT);
                WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
                        userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
                );
                JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
                redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
            }, userSetting.getPlatformPlayTimeout());
 
            // 添加订阅
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId());
 
            subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{
                        dynamicTask.stop(taskKey);
                        responseSendItem(mediaServerItem, content, toId, serial);
                    });
 
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
                    content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
                    content.getMediaServerId());
 
            String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
            logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream());
            redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
        }
    }
 
    /**
     * 将获取到的sendItem发送出去
     */
    private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                content.getPort(), content.getSsrc(), content.getPlatformId(),
                content.getApp(), content.getStream(), content.getChannelId(),
                content.getTcp(), content.getRtcp(), ssrcFromCallback -> {
                    return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null;
                });
 
        WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
        result.setCode(0);
        ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
        responseSendItemMsg.setSendRtpItem(sendRtpItem);
        responseSendItemMsg.setMediaServerItem(mediaServerItem);
        result.setData(responseSendItemMsg);
 
        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
                userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
        );
        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
 
    /**
     * 发送消息要求下级生成推流信息
     * @param serverId 下级服务ID
     * @param app 应用名
     * @param stream 流ID
     * @param ip 目标IP
     * @param port 目标端口
     * @param ssrc  ssrc
     * @param platformId 平台国标编号
     * @param channelId 通道ID
     * @param isTcp 是否使用TCP
     * @param callback 得到信息的回调
     */
    public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
                        String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
        RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName);
        requestSendItemMsg.setServerId(serverId);
        String key = UUID.randomUUID().toString();
        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
                key, requestSendItemMsg);
 
        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject);
        callbacks.put(key, callback);
        callbacksForError.put(key, errorCallback);
        dynamicTask.startDelay(key, ()->{
            callbacks.remove(key);
            callbacksForError.remove(key);
            WVPResult<Object> wvpResult = new WVPResult<>();
            wvpResult.setCode(ERROR_CODE_TIMEOUT);
            wvpResult.setMsg("timeout");
            errorCallback.handler(wvpResult);
        }, userSetting.getPlatformPlayTimeout());
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
 
    /**
     * 发送请求推流的消息
     * @param param 推流参数
     * @param callback 回调
     */
    public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
        String key = UUID.randomUUID().toString();
        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param);
 
        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject);
        dynamicTask.startDelay(key, ()->{
            callbacksForStartSendRtpStream.remove(key);
            callbacksForError.remove(key);
        }, userSetting.getPlatformPlayTimeout());
        callbacksForStartSendRtpStream.put(key, callback);
        callbacksForError.put(key, (wvpResult)->{
            logger.info("[REDIS 请求其他平台推流] 失败: {}", wvpResult.getMsg());
            callbacksForStartSendRtpStream.remove(key);
            callbacksForError.remove(key);
        });
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
 
    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
        if (platformGbId == null) {
            platformGbId = "*";
        }
        if (channelId == null) {
            channelId = "*";
        }
        if (streamId == null) {
            streamId = "*";
        }
        if (callId == null) {
            callId = "*";
        }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
                + callId;
        List<Object> scan = RedisUtil.scan(redisTemplate, key);
        if (scan.size() > 0) {
            return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
        }else {
            return null;
        }
    }
}