648540858
2022-10-25 f3454caaf6241c44e82704711a3a1efe264f400f
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -6,20 +6,17 @@
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.*;
@@ -35,19 +32,13 @@
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private ZLMHttpHookSubscribe hookSubscribe;
    @Autowired
    private IStreamProxyService streamProxyService;
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private EventPublisher publisher;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private MediaConfig mediaConfig;
@@ -67,22 +58,29 @@
            mediaServerService.updateToDatabase(mediaSerItem);
        }
        mediaServerService.syncCatchFromDatabase();
        HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
        // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(),
        hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
                (MediaServerItem mediaServerItem, JSONObject response)->{
            ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class);
            if (zlmServerConfig !=null ) {
                if (startGetMedia != null) {
                    startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
                    if (startGetMedia.size() == 0) {
                        hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
                    }
                }
            }
        });
        // 获取zlm信息
        logger.info("[zlm] 等待默认zlm中...");
        // 获取所有的zlm, 并开启主动连接
        List<MediaServerItem> all = mediaServerService.getAllFromDatabase();
        Map<String, MediaServerItem> allMap = new HashMap<>();
        mediaServerService.updateVmServer(all);
        if (all.size() == 0) {
            all.add(mediaConfig.getMediaSerItem());
@@ -93,6 +91,7 @@
            }
            startGetMedia.put(mediaServerItem.getId(), true);
            connectZlmServer(mediaServerItem);
            allMap.put(mediaServerItem.getId(), mediaServerItem);
        }
        String taskKey = "zlm-connect-timeout";
        dynamicTask.startDelay(taskKey, ()->{
@@ -103,12 +102,17 @@
                }
                startGetMedia = null;
            }
            hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject());
        //  TODO 清理数据库中与redis不匹配的zlm
            // 获取redis中所有的zlm
            List<MediaServerItem> allInRedis = mediaServerService.getAll();
            for (MediaServerItem mediaServerItem : allInRedis) {
                if (!allMap.containsKey(mediaServerItem.getId())) {
                    mediaServerService.delete(mediaServerItem.getId());
                }
            }
        }, 60 * 1000 );
    }
    @Async
    @Async("taskExecutor")
    public void connectZlmServer(MediaServerItem mediaServerItem){
        String connectZlmServerTaskKey = "connect-zlm-" + mediaServerItem.getId();
        ZLMServerConfig zlmServerConfigFirst = getMediaServerConfig(mediaServerItem);
@@ -116,6 +120,9 @@
            zlmServerConfigFirst.setIp(mediaServerItem.getIp());
            zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort());
            startGetMedia.remove(mediaServerItem.getId());
            if (startGetMedia.size() == 0) {
                hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
            }
            mediaServerService.zlmServerOnline(zlmServerConfigFirst);
        }else {
            logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接",
@@ -130,6 +137,9 @@
                zlmServerConfig.setIp(mediaServerItem.getIp());
                zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
                startGetMedia.remove(mediaServerItem.getId());
                if (startGetMedia.size() == 0) {
                    hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
                }
                mediaServerService.zlmServerOnline(zlmServerConfig);
            }
        }, 2000);