648540858
2024-04-02 8b90fade9eb3a62b428f23f2306cb1911c98d355
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
package com.genersoft.iot.vmp.media.abl;
 
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.abl.bean.AblServerConfig;
import com.genersoft.iot.vmp.media.abl.bean.ConfigKeyId;
import com.genersoft.iot.vmp.media.abl.event.HookAblServerKeepaliveEvent;
import com.genersoft.iot.vmp.media.abl.event.HookAblServerStartEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 管理zlm流媒体节点的状态
 */
@Component
public class ABLMediaServerStatusManger {
 
    private final static Logger logger = LoggerFactory.getLogger(ABLMediaServerStatusManger.class);
 
    private final Map<Object, MediaServer> offlineABLPrimaryMap = new ConcurrentHashMap<>();
    private final Map<Object, MediaServer> offlineAblsecondaryMap = new ConcurrentHashMap<>();
    private final Map<Object, Long> offlineAblTimeMap = new ConcurrentHashMap<>();
 
    @Autowired
    private ABLRESTfulUtils ablResTfulUtils;
 
    @Autowired
    private IMediaServerService mediaServerService;
 
    @Autowired
    private DynamicTask dynamicTask;
 
    @Value("${server.ssl.enabled:false}")
    private boolean sslEnabled;
 
    @Value("${server.port}")
    private Integer serverPort;
 
    @Autowired
    private UserSetting userSetting;
 
    private final String type = "abl";
 
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaServerChangeEvent event) {
        if (event.getMediaServerItemList() == null
                || event.getMediaServerItemList().isEmpty()) {
            return;
        }
        for (MediaServer mediaServerItem : event.getMediaServerItemList()) {
            if (!type.equals(mediaServerItem.getType())) {
                continue;
            }
            logger.info("[ABL-添加待上线节点] ID:" + mediaServerItem.getId());
            offlineABLPrimaryMap.put(mediaServerItem.getId(), mediaServerItem);
            offlineAblTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis());
        }
    }
 
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(HookAblServerStartEvent event) {
        if (event.getMediaServerItem() == null
                || !type.equals(event.getMediaServerItem().getType())
                || event.getMediaServerItem().isStatus()) {
            return;
        }
        MediaServer serverItem = mediaServerService.getOne(event.getMediaServerItem().getId());
        if (serverItem == null) {
            return;
        }
        logger.info("[ABL-HOOK事件-服务启动] ID:" + event.getMediaServerItem().getId());
        online(serverItem, null);
    }
 
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(HookAblServerKeepaliveEvent event) {
        if (event.getMediaServerItem() == null) {
            return;
        }
        MediaServer serverItem = mediaServerService.getOne(event.getMediaServerItem().getId());
        if (serverItem == null) {
            return;
        }
        logger.info("[ABL-HOOK事件-心跳] ID:" + event.getMediaServerItem().getId());
        online(serverItem, null);
    }
 
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaServerDeleteEvent event) {
        if (event.getMediaServerId() == null) {
            return;
        }
        logger.info("[ABL-节点被移除] ID:" + event.getMediaServerId());
        offlineABLPrimaryMap.remove(event.getMediaServerId());
        offlineAblsecondaryMap.remove(event.getMediaServerId());
        offlineAblTimeMap.remove(event.getMediaServerId());
    }
 
    @Scheduled(fixedDelay = 10*1000)   //每隔10秒检查一次
    public void execute(){
        // 初次加入的离线节点会在30分钟内,每间隔十秒尝试一次,30分钟后如果仍然没有上线,则每隔30分钟尝试一次连接
        if (offlineABLPrimaryMap.isEmpty() && offlineAblsecondaryMap.isEmpty()) {
            return;
        }
        if (!offlineABLPrimaryMap.isEmpty()) {
            for (MediaServer mediaServerItem : offlineABLPrimaryMap.values()) {
                if (offlineAblTimeMap.get(mediaServerItem.getId()) <  System.currentTimeMillis() - 30*60*1000) {
                    offlineAblsecondaryMap.put(mediaServerItem.getId(), mediaServerItem);
                    offlineABLPrimaryMap.remove(mediaServerItem.getId());
                    continue;
                }
                logger.info("[ABL-尝试连接] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
                JSONObject responseJson = ablResTfulUtils.getServerConfig(mediaServerItem);
                AblServerConfig ablServerConfig = null;
                if (responseJson == null) {
                    logger.info("[ABL-尝试连接]失败, ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
                    continue;
                }
                JSONArray data = responseJson.getJSONArray("params");
                if (data == null || data.isEmpty()) {
                    logger.info("[ABL-尝试连接]失败, ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
                }else {
                    ablServerConfig = AblServerConfig.getInstance(data);
                    initPort(mediaServerItem, ablServerConfig);
                    online(mediaServerItem, ablServerConfig);
                }
            }
        }
        if (!offlineAblsecondaryMap.isEmpty()) {
            for (MediaServer mediaServerItem : offlineAblsecondaryMap.values()) {
                if (offlineAblTimeMap.get(mediaServerItem.getId()) <  System.currentTimeMillis() - 30*60*1000) {
                    continue;
                }
                logger.info("[ABL-尝试连接] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
                JSONObject responseJson = ablResTfulUtils.getServerConfig(mediaServerItem);
                AblServerConfig ablServerConfig = null;
                if (responseJson == null) {
                    logger.info("[ABL-尝试连接]失败, ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
                    offlineAblTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis());
                    continue;
                }
                JSONArray data = responseJson.getJSONArray("params");
                if (data == null || data.isEmpty()) {
                    logger.info("[ABL-尝试连接]失败, ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
                    offlineAblTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis());
                }else {
                    ablServerConfig = AblServerConfig.getInstance(data);
                    initPort(mediaServerItem, ablServerConfig);
                    online(mediaServerItem, ablServerConfig);
                }
            }
        }
    }
 
    private void online(MediaServer mediaServerItem, AblServerConfig config) {
        offlineABLPrimaryMap.remove(mediaServerItem.getId());
        offlineAblsecondaryMap.remove(mediaServerItem.getId());
        offlineAblTimeMap.remove(mediaServerItem.getId());
        if (!mediaServerItem.isStatus()) {
            logger.info("[ABL-连接成功] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
            mediaServerItem.setStatus(true);
            mediaServerItem.setHookAliveInterval(10F);
            mediaServerService.update(mediaServerItem);
            if(mediaServerItem.isAutoConfig()) {
                if (config == null) {
                    JSONObject responseJSON = ablResTfulUtils.getServerConfig(mediaServerItem);
                    JSONArray data = responseJSON.getJSONArray("params");
                    if (data != null && !data.isEmpty()) {
                        config = AblServerConfig.getInstance(data);
                    }
                }
                if (config != null) {
                    initPort(mediaServerItem, config);
                    setAblConfig(mediaServerItem, false, config);
                }
            }
            mediaServerService.update(mediaServerItem);
        }
        // 设置两次心跳未收到则认为zlm离线
        String key = "ABL-keepalive-" + mediaServerItem.getId();
        dynamicTask.startDelay(key, ()->{
            logger.warn("[ABL-心跳超时] ID:{}", mediaServerItem.getId());
            mediaServerItem.setStatus(false);
            offlineABLPrimaryMap.put(mediaServerItem.getId(), mediaServerItem);
            offlineAblTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis());
            // TODO 发送离线通知
            mediaServerService.update(mediaServerItem);
        }, (int)(mediaServerItem.getHookAliveInterval() * 2 * 1000));
    }
    private void initPort(MediaServer mediaServerItem, AblServerConfig ablServerConfig) {
        // 端口只会从配置中读取一次,一旦自己配置或者读取过了将不在配置
//        if (mediaServerItem.getHttpSSlPort() == 0) {
//            mediaServerItem.setHttpSSlPort(ablServerConfig.getHttpSSLport());
//        }
        if (mediaServerItem.getRtmpPort() == 0 && ablServerConfig.getRtmpPort() != null) {
            mediaServerItem.setRtmpPort(ablServerConfig.getRtmpPort());
        }
//        if (mediaServerItem.getRtmpSSlPort() == 0) {
//            mediaServerItem.setRtmpSSlPort(ablServerConfig.getRtmpSslPort());
//        }
        if (mediaServerItem.getRtspPort() == 0 && ablServerConfig.getRtspPort() != null) {
            mediaServerItem.setRtspPort(ablServerConfig.getRtspPort());
        }
        if (mediaServerItem.getFlvPort() == 0 && ablServerConfig.getHttpFlvPort() != null) {
            mediaServerItem.setFlvPort(ablServerConfig.getHttpFlvPort());
        }
        if (mediaServerItem.getWsFlvPort() == 0 && ablServerConfig.getWsPort() != null) {
            mediaServerItem.setWsFlvPort(ablServerConfig.getWsPort());
        }
        if (mediaServerItem.getRtpProxyPort() == 0 && ablServerConfig.getPsTsRecvPort() != null) {
            mediaServerItem.setRtpProxyPort(ablServerConfig.getPsTsRecvPort());
        }
//        if (mediaServerItem.getRtspSSLPort() == 0) {
//            mediaServerItem.setRtspSSLPort(ablServerConfig.getRtspSSlport());
//        }
//        if (mediaServerItem.getRtpProxyPort() == 0) {
//            mediaServerItem.setRtpProxyPort(ablServerConfig.getRtpProxyPort());
//        }
        mediaServerItem.setHookAliveInterval(10F);
    }
 
    public void setAblConfig(MediaServer mediaServerItem, boolean restart, AblServerConfig config) {
        try {
            if (config.getHookEnable() == 0) {
                logger.info("[媒体服务节点-ABL]  开启HOOK功能 :{}", mediaServerItem.getId());
                JSONObject responseJSON = ablResTfulUtils.setConfigParamValue(mediaServerItem, "hook_enable", "1");
                if (responseJSON.getInteger("code") == 0) {
                    logger.info("[媒体服务节点-ABL]  开启HOOK功能成功 :{}", mediaServerItem.getId());
                }else {
                    logger.info("[媒体服务节点-ABL]  开启HOOK功能失败 :{}->{}", mediaServerItem.getId(), responseJSON.getString("memo"));
                }
            }
        }catch (Exception e) {
            logger.info("[媒体服务节点-ABL]  开启HOOK功能失败 :{}", mediaServerItem.getId(), e);
        }
        // 设置相关的HOOK
        String[] hookUrlArray = {
                "on_stream_arrive",
                "on_stream_none_reader",
                "on_record_mp4",
                "on_stream_disconnect",
                "on_stream_not_found",
                "on_server_started",
                "on_publish",
                "on_play",
                "on_record_progress",
                "on_server_keepalive",
                "on_stream_not_arrive",
                "on_delete_record_mp4",
        };
 
        String protocol = sslEnabled ? "https" : "http";
        String hookPrefix = String.format("%s://%s:%s/index/hook/abl", protocol, mediaServerItem.getHookIp(), serverPort);
        Field[] fields = AblServerConfig.class.getDeclaredFields();
        for (Field field : fields) {
            try {
                if (field.isAnnotationPresent(ConfigKeyId.class)) {
                    ConfigKeyId configKeyId = field.getAnnotation(ConfigKeyId.class);
                    for (String hook : hookUrlArray) {
                        if (configKeyId.value().equals(hook)) {
                            String hookUrl =  String.format("%s/%s", hookPrefix, hook);
                            field.setAccessible(true);
                            // 利用反射获取值后对比是否与配置中相同,不同则进行设置
                            if (!hookUrl.equals(field.get(config))) {
                                JSONObject responseJSON = ablResTfulUtils.setConfigParamValue(mediaServerItem, hook, hookUrl);
                                if (responseJSON.getInteger("code") == 0) {
                                    logger.info("[媒体服务节点-ABL]  设置HOOK {} 成功 :{}", hook, mediaServerItem.getId());
                                }else {
                                    logger.info("[媒体服务节点-ABL]  设置HOOK {} 失败 :{}->{}", hook, mediaServerItem.getId(), responseJSON.getString("memo"));
                                }
                            }
                        }
                    }
                }
            }catch (Exception e) {
                logger.info("[媒体服务节点-ABL]  设置HOOK 失败 :{}", mediaServerItem.getId(), e);
            }
        }
 
 
 
 
//        Map<String, Object> param = new HashMap<>();
//        param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
//        if (mediaServerItem.getRtspPort() != 0) {
//            param.put("ffmpeg.snap", "%s -rtsp_transport tcp -i %s -y -f mjpeg -frames:v 1 %s");
//        }
//        param.put("hook.enable","1");
//        param.put("hook.on_flow_report","");
//        param.put("hook.on_play",String.format("%s/on_play", hookPrefix));
//        param.put("hook.on_http_access","");
//        param.put("hook.on_publish", String.format("%s/on_publish", hookPrefix));
//        param.put("hook.on_record_ts","");
//        param.put("hook.on_rtsp_auth","");
//        param.put("hook.on_rtsp_realm","");
//        param.put("hook.on_server_started",String.format("%s/on_server_started", hookPrefix));
//        param.put("hook.on_shell_login","");
//        param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrefix));
//        param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrefix));
//        param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrefix));
//        param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrefix));
//        param.put("hook.on_send_rtp_stopped",String.format("%s/on_send_rtp_stopped", hookPrefix));
//        param.put("hook.on_rtp_server_timeout",String.format("%s/on_rtp_server_timeout", hookPrefix));
//        param.put("hook.on_record_mp4",String.format("%s/on_record_mp4", hookPrefix));
//        param.put("hook.timeoutSec","30");
//        param.put("hook.alive_interval", mediaServerItem.getHookAliveInterval());
//        // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。
//        // 置0关闭此特性(推流断开会导致立即断开播放器)
//        // 此参数不应大于播放器超时时间
//        // 优化此消息以更快的收到流注销事件
//        param.put("protocol.continue_push_ms", "3000" );
//        // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流,
//        // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
//        if (mediaServerItem.isRtpEnable() && !ObjectUtils.isEmpty(mediaServerItem.getRtpPortRange())) {
//            param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-"));
//        }
//
//        if (!ObjectUtils.isEmpty(mediaServerItem.getRecordPath())) {
//            File recordPathFile = new File(mediaServerItem.getRecordPath());
//            param.put("protocol.mp4_save_path", recordPathFile.getParentFile().getPath());
//            param.put("protocol.downloadRoot", recordPathFile.getParentFile().getPath());
//            param.put("record.appName", recordPathFile.getName());
//        }
//
//        JSONObject responseJSON = ablResTfulUtils.setConfigParamValue(mediaServerItem, param);
//
//        if (responseJSON != null && responseJSON.getInteger("code") == 0) {
//            if (restart) {
//                logger.info("[媒体服务节点] 设置成功,开始重启以保证配置生效 {} -> {}:{}",
//                        mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
//                ablResTfulUtils.restartServer(mediaServerItem);
//            }else {
//                logger.info("[媒体服务节点] 设置成功 {} -> {}:{}",
//                        mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
//            }
//        }else {
//            logger.info("[媒体服务节点] 设置媒体服务节点失败 {} -> {}:{}",
//                    mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
//        }
    }
 
}