648540858
2022-07-28 fd091e545ba174adc36a9d3370e6d4c040ad33fd
优化hook订阅机制
7个文件已修改
5个文件已添加
408 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java 85 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -10,6 +10,9 @@
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -348,25 +351,19 @@
    @Override
    public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
                              ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
        String streamId = ssrcInfo.getStream();
        String stream = ssrcInfo.getStream();
        try {
            if (device == null) {
                return;
            }
            String streamMode = device.getStreamMode().toUpperCase();
            logger.info("{} 分配的ZLM为: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
            // 添加订阅
            JSONObject subscribeKey = new JSONObject();
            subscribeKey.put("app", "rtp");
            subscribeKey.put("stream", streamId);
            subscribeKey.put("regist", true);
            subscribeKey.put("schema", "rtmp");
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
            logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                if (event != null) {
                    event.response(mediaServerItemInUse, json);
                    subscribe.removeSubscribe(hookSubscribe);
                }
            });
            //
@@ -440,7 +437,7 @@
                errorEvent.response(e);
            }), e ->{
                // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
                streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
                streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
                streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog);
                okEvent.response(e);
            });
@@ -530,21 +527,14 @@
            CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId());
            // 添加订阅
            JSONObject subscribeKey = new JSONObject();
            subscribeKey.put("app", "rtp");
            subscribeKey.put("stream", ssrcInfo.getStream());
            subscribeKey.put("regist", true);
            subscribeKey.put("schema", "rtmp");
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey);
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                        if (hookEvent != null) {
                            InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
                            hookEvent.call(inviteStreamInfo);
                        }
                        subscribe.removeSubscribe(hookSubscribe);
                    });
            Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
@@ -643,21 +633,15 @@
            CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
            // 添加订阅
            JSONObject subscribeKey = new JSONObject();
            subscribeKey.put("app", "rtp");
            subscribeKey.put("stream", ssrcInfo.getStream());
            subscribeKey.put("regist", true);
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                        hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
                        subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
                        subscribeKey.put("regist", false);
                        subscribeKey.put("schema", "rtmp");
                        subscribe.removeSubscribe(hookSubscribe);
                        hookSubscribe.getContent().put("regist", false);
                        hookSubscribe.getContent().put("schema", "rtmp");
                        // 添加流注销的订阅,注销了后向设备发送bye
                        subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                        subscribe.addSubscribe(hookSubscribe,
                                (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
                                    ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
                                    if (transaction != null) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -102,7 +102,7 @@
            logger.debug("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString());
        }
        String mediaServerId = json.getString("mediaServerId");
        List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
        List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
        if (subscribes != null  && subscribes.size() > 0) {
            for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
                subscribe.response(null, json);
@@ -168,7 +168,7 @@
            logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param));
        }
        String mediaServerId = param.getMediaServerId();
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
        if (subscribe != null ) {
            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
            if (mediaInfo != null) {
@@ -253,7 +253,7 @@
        }
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
        if (subscribe != null) {
            if (mediaInfo != null) {
                subscribe.response(mediaInfo, json);
@@ -377,7 +377,7 @@
            logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString());
        }
        String mediaServerId = json.getString("mediaServerId");
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json);
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
        if (subscribe != null ) {
            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
            if (mediaInfo != null) {
@@ -403,7 +403,7 @@
        logger.info("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
        String mediaServerId = item.getMediaServerId();
        JSONObject json = (JSONObject) JSON.toJSON(item);
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
        if (subscribe != null ) {
            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
            if (mediaInfo != null) {
@@ -614,7 +614,7 @@
        }
        String remoteAddr = request.getRemoteAddr();
        jsonObject.put("ip", remoteAddr);
        List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started);
        List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
        if (subscribes != null  && subscribes.size() > 0) {
            for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
                subscribe.response(null, jsonObject);
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
@@ -1,12 +1,16 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
 * @description:针对 ZLMediaServer的hook事件订阅
@@ -16,49 +20,38 @@
@Component
public class ZLMHttpHookSubscribe {
    public enum HookType{
        on_flow_report,
        on_http_access,
        on_play,
        on_publish,
        on_record_mp4,
        on_rtsp_auth,
        on_rtsp_realm,
        on_shell_login,
        on_stream_changed,
        on_stream_none_reader,
        on_stream_not_found,
        on_server_started,
        on_server_keepalive
    }
    @FunctionalInterface
    public interface Event{
        void response(MediaServerItem mediaServerItem, JSONObject response);
    }
    private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
    private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
    public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) {
        allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event);
    public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
        if (hookSubscribe.getExpires() == null) {
            // 默认5分钟过期
            Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
            hookSubscribe.setExpires(expiresInstant);
        }
        allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
    }
    public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) {
    public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
        ZLMHttpHookSubscribe.Event event= null;
        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
        if (eventMap == null) {
            return null;
        }
        for (JSONObject key : eventMap.keySet()) {
        for (IHookSubscribe key : eventMap.keySet()) {
            Boolean result = null;
            for (String s : key.keySet()) {
            for (String s : key.getContent().keySet()) {
                if (result == null) {
                    result = key.getString(s).equals(hookResponse.getString(s));
                    result = key.getContent().getString(s).equals(hookResponse.getString(s));
                }else {
                    if (key.getString(s) == null) {
                    if (key.getContent().getString(s) == null) {
                        continue;
                    }
                    result = result && key.getString(s).equals(hookResponse.getString(s));
                    result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
                }
            }
@@ -69,26 +62,30 @@
        return event;
    }
    public void removeSubscribe(HookType type, JSONObject hookResponse) {
        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
    public void removeSubscribe(IHookSubscribe hookSubscribe) {
        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
        if (eventMap == null) {
            return;
        }
        Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet();
        Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
        if (entries.size() > 0) {
            List<Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
            for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entries) {
                JSONObject key = entry.getKey();
            List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
            for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) {
                JSONObject content = entry.getKey().getContent();
                if (content == null || content.size() == 0) {
                    entriesToRemove.add(entry);
                    continue;
                }
                Boolean result = null;
                for (String s : key.keySet()) {
                for (String s : content.keySet()) {
                    if (result == null) {
                        result = key.getString(s).equals(hookResponse.getString(s));
                        result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
                    }else {
                        if (key.getString(s) == null) {
                        if (content.getString(s) == null) {
                            continue;
                        }
                        result = result && key.getString(s).equals(hookResponse.getString(s));
                        result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
                    }
                }
                if (null != result && result){
@@ -97,7 +94,7 @@
            }
            if (!CollectionUtils.isEmpty(entriesToRemove)) {
                for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
                for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
                    entries.remove(entry);
                }
            }
@@ -111,17 +108,25 @@
     * @return
     */
    public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
        // ZLMHttpHookSubscribe.Event event= null;
        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
        if (eventMap == null) {
            return null;
        }
        List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
        for (JSONObject key : eventMap.keySet()) {
        for (IHookSubscribe key : eventMap.keySet()) {
            result.add(eventMap.get(key));
        }
        return result;
    }
    public List<IHookSubscribe> getAll(){
        ArrayList<IHookSubscribe> result = new ArrayList<>();
        Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
        for (Map<IHookSubscribe, Event> value : values) {
            result.addAll(value.keySet());
        }
        return result;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -6,22 +6,22 @@
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Component
@Order(value=1)
@@ -38,16 +38,10 @@
    private ZLMHttpHookSubscribe hookSubscribe;
    @Autowired
    private IStreamProxyService streamProxyService;
    @Autowired
    private EventPublisher publisher;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private MediaConfig mediaConfig;
@@ -67,16 +61,24 @@
            mediaServerService.updateToDatabase(mediaSerItem);
        }
        mediaServerService.syncCatchFromDatabase();
        HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
//        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
//        hookSubscribeForStreamChange.setExpires(expiresInstant);
        // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(),
        hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
                (MediaServerItem mediaServerItem, JSONObject response)->{
            ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class);
            if (zlmServerConfig !=null ) {
                if (startGetMedia != null) {
                    startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
                    if (startGetMedia.size() == 0) {
                        hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
                    }
                }
            }
        });
        // 获取zlm信息
        logger.info("[zlm] 等待默认zlm中...");
@@ -103,7 +105,6 @@
                }
                startGetMedia = null;
            }
            hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject());
        //  TODO 清理数据库中与redis不匹配的zlm
        }, 60 * 1000 );
    }
@@ -116,6 +117,9 @@
            zlmServerConfigFirst.setIp(mediaServerItem.getIp());
            zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort());
            startGetMedia.remove(mediaServerItem.getId());
            if (startGetMedia.size() == 0) {
                hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
            }
            mediaServerService.zlmServerOnline(zlmServerConfigFirst);
        }else {
            logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接",
@@ -130,6 +134,9 @@
                zlmServerConfig.setIp(mediaServerItem.getIp());
                zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
                startGetMedia.remove(mediaServerItem.getId());
                if (startGetMedia.size() == 0) {
                    hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
                }
                mediaServerService.zlmServerOnline(zlmServerConfig);
            }
        }, 2000);
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java
New file
@@ -0,0 +1,33 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
/**
 * hook 订阅工厂
 * @author lin
 */
public class HookSubscribeFactory {
    public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) {
        HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange();
        JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject();
        subscribeKey.put("app", app);
        subscribeKey.put("stream", stream);
        subscribeKey.put("regist", regist);
        if (scheam != null) {
            subscribeKey.put("schema", scheam);
        }
        subscribeKey.put("mediaServerId", mediaServerId);
        hookSubscribe.setContent(subscribeKey);
        return hookSubscribe;
    }
    public static HookSubscribeForServerStarted on_server_started() {
        HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
        hookSubscribe.setContent(new JSONObject());
        return hookSubscribe;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java
New file
@@ -0,0 +1,44 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import java.time.Instant;
/**
 * hook订阅-流变化
 * @author lin
 */
public class HookSubscribeForServerStarted implements IHookSubscribe{
    private HookType hookType = HookType.on_server_started;
    private JSONObject content;
    @JSONField(format="yyyy-MM-dd HH:mm:ss")
    private Instant expires;
    @Override
    public HookType getHookType() {
        return hookType;
    }
    @Override
    public JSONObject getContent() {
        return content;
    }
    public void setContent(JSONObject content) {
        this.content = content;
    }
    @Override
    public Instant getExpires() {
        return expires;
    }
    @Override
    public void setExpires(Instant expires) {
        this.expires = expires;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java
New file
@@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import java.time.Instant;
/**
 * hook订阅-流变化
 * @author lin
 */
public class HookSubscribeForStreamChange implements IHookSubscribe{
    private HookType hookType = HookType.on_stream_changed;
    private JSONObject content;
    private Instant expires;
    @Override
    public HookType getHookType() {
        return hookType;
    }
    @Override
    public JSONObject getContent() {
        return content;
    }
    public void setContent(JSONObject content) {
        this.content = content;
    }
    @Override
    public Instant getExpires() {
        return expires;
    }
    @Override
    public void setExpires(Instant expires) {
        this.expires = expires;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java
New file
@@ -0,0 +1,23 @@
package com.genersoft.iot.vmp.media.zlm.dto;
/**
 * hook类型
 * @author lin
 */
public enum HookType {
    on_flow_report,
    on_http_access,
    on_play,
    on_publish,
    on_record_mp4,
    on_rtsp_auth,
    on_rtsp_realm,
    on_shell_login,
    on_stream_changed,
    on_stream_none_reader,
    on_stream_not_found,
    on_server_started,
    on_server_keepalive
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java
New file
@@ -0,0 +1,36 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import java.time.Instant;
/**
 * zlm hook事件的参数
 * @author lin
 */
public interface IHookSubscribe {
    /**
     * 获取hook类型
     * @return hook类型
     */
    HookType getHookType();
    /**
     * 获取hook的具体内容
     * @return hook的具体内容
     */
    JSONObject getContent();
    /**
     * 设置过期时间
     * @param instant 过期时间
     */
    void setExpires(Instant instant);
    /**
     * 获取过期时间
     * @return 过期时间
     */
    Instant getExpires();
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -13,6 +13,9 @@
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -296,16 +299,10 @@
                    // 单端口模式streamId也有变化,需要重新设置监听
                    if (!mediaServerItem.isRtpEnable()) {
                        // 添加订阅
                        JSONObject subscribeKey = new JSONObject();
                        subscribeKey.put("app", "rtp");
                        subscribeKey.put("stream", stream);
                        subscribeKey.put("regist", true);
                        subscribeKey.put("schema", "rtmp");
                        subscribeKey.put("mediaServerId", mediaServerItem.getId());
                        subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);
                        subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                        subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                                (MediaServerItem mediaServerItemInUse, JSONObject response)->{
                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
                        subscribe.removeSubscribe(hookSubscribe);
                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
                                    logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                    dynamicTask.stop(timeOutTaskKey);
                                    // hook响应
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
@@ -8,6 +8,9 @@
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
@@ -270,14 +273,9 @@
            }, userSetting.getPlatformPlayTimeout());
            // 添加订阅
            JSONObject subscribeKey = new JSONObject();
            subscribeKey.put("app", content.getApp());
            subscribeKey.put("stream", content.getStream());
            subscribeKey.put("regist", true);
            subscribeKey.put("schema", "rtmp");
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtmp", mediaServerItem.getId());
            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                        dynamicTask.stop(taskKey);
                        responseSendItem(mediaServerItem, content, toId, serial);
                    });
src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
@@ -8,6 +8,8 @@
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
@@ -38,7 +40,7 @@
public class ServerController {
    @Autowired
    private ConfigurableApplicationContext context;
    private ZLMHttpHookSubscribe zlmHttpHookSubscribe;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -254,6 +256,18 @@
        return result;
    }
    @ApiOperation("获取当前所有hook")
    @GetMapping(value = "/hooks")
    @ResponseBody
    public WVPResult<List<IHookSubscribe>> getHooks(){
        WVPResult<List<IHookSubscribe>> result = new WVPResult<>();
        result.setCode(0);
        result.setMsg("success");
        List<IHookSubscribe> all = zlmHttpHookSubscribe.getAll();
        result.setData(all);
        return result;
    }
//    @ApiOperation("当前进行中的动态任务")
//    @GetMapping(value = "/dynamicTask")
//    @ResponseBody