648540858
2022-11-10 8870f5f5a182f4af527dc2b89ad75063019df14f
优化使用来源ip作为流ip
13个文件已修改
2个文件已添加
494 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/StreamURL.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java 65 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 166 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/all-application.yml 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
@@ -2,10 +2,10 @@
import io.swagger.v3.oas.annotations.media.Schema;
import java.net.URL;
import java.io.Serializable;
@Schema(description = "流信息")
public class StreamInfo {
public class StreamInfo implements Serializable {
    @Schema(description = "应用名")
    private String app;
@@ -79,6 +79,94 @@
    @Schema(description = "是否暂停(录像回放使用)")
    private boolean pause;
    public void setFlv(StreamURL flv) {
        this.flv = flv;
    }
    public void setHttps_flv(StreamURL https_flv) {
        this.https_flv = https_flv;
    }
    public void setWs_flv(StreamURL ws_flv) {
        this.ws_flv = ws_flv;
    }
    public void setWss_flv(StreamURL wss_flv) {
        this.wss_flv = wss_flv;
    }
    public void setFmp4(StreamURL fmp4) {
        this.fmp4 = fmp4;
    }
    public void setHttps_fmp4(StreamURL https_fmp4) {
        this.https_fmp4 = https_fmp4;
    }
    public void setWs_fmp4(StreamURL ws_fmp4) {
        this.ws_fmp4 = ws_fmp4;
    }
    public void setWss_fmp4(StreamURL wss_fmp4) {
        this.wss_fmp4 = wss_fmp4;
    }
    public void setHls(StreamURL hls) {
        this.hls = hls;
    }
    public void setHttps_hls(StreamURL https_hls) {
        this.https_hls = https_hls;
    }
    public void setWs_hls(StreamURL ws_hls) {
        this.ws_hls = ws_hls;
    }
    public void setWss_hls(StreamURL wss_hls) {
        this.wss_hls = wss_hls;
    }
    public void setTs(StreamURL ts) {
        this.ts = ts;
    }
    public void setHttps_ts(StreamURL https_ts) {
        this.https_ts = https_ts;
    }
    public void setWs_ts(StreamURL ws_ts) {
        this.ws_ts = ws_ts;
    }
    public void setWss_ts(StreamURL wss_ts) {
        this.wss_ts = wss_ts;
    }
    public void setRtmp(StreamURL rtmp) {
        this.rtmp = rtmp;
    }
    public void setRtmps(StreamURL rtmps) {
        this.rtmps = rtmps;
    }
    public void setRtsp(StreamURL rtsp) {
        this.rtsp = rtsp;
    }
    public void setRtsps(StreamURL rtsps) {
        this.rtsps = rtsps;
    }
    public void setRtc(StreamURL rtc) {
        this.rtc = rtc;
    }
    public void setRtcs(StreamURL rtcs) {
        this.rtcs = rtcs;
    }
    public void setRtmp(String host, int port, int sslPort, String app, String stream, String callIdParam) {
        String file = String.format("%s/%s/%s", app, stream, callIdParam);
        this.rtmp = new StreamURL("rtmp", host, port, file);
src/main/java/com/genersoft/iot/vmp/common/StreamURL.java
@@ -2,9 +2,11 @@
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
@Schema(description = "流地址信息")
public class StreamURL {
public class StreamURL implements Serializable {
    @Schema(description = "协议")
    private String protocol;
src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java
@@ -20,8 +20,6 @@
    
    Integer ptzSpeed = 50;
    Integer keepaliveTimeOut = 255;
    Integer registerTimeInterval = 120;
    private boolean alarm;
@@ -50,9 +48,6 @@
        this.ptzSpeed = ptzSpeed;
    }
    public void setKeepaliveTimeOut(Integer keepaliveTimeOut) {
        this.keepaliveTimeOut = keepaliveTimeOut;
    }
    public void setRegisterTimeInterval(Integer registerTimeInterval) {
        this.registerTimeInterval = registerTimeInterval;
@@ -84,10 +79,6 @@
    public Integer getPtzSpeed() {
        return ptzSpeed;
    }
    public Integer getKeepaliveTimeOut() {
        return keepaliveTimeOut;
    }
    public Integer getRegisterTimeInterval() {
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -11,6 +11,7 @@
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.*;
import java.util.*;
@@ -107,6 +108,9 @@
    }
    public SipProviderImpl getUdpSipProvider(String ip) {
        if (ObjectUtils.isEmpty(ip)) {
            return null;
        }
        return udpSipProviderMap.get(ip);
    }
@@ -125,6 +129,9 @@
    }
    public SipProviderImpl getTcpSipProvider(String ip) {
        if (ObjectUtils.isEmpty(ip)) {
            return null;
        }
        return tcpSipProviderMap.get(ip);
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java
@@ -5,22 +5,19 @@
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.utils.GitUtil;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.SipException;
import javax.sip.SipFactory;
import javax.sip.header.CallIdHeader;
import javax.sip.header.UserAgentHeader;
import javax.sip.header.ViaHeader;
import javax.sip.message.Message;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.net.InetAddress;
import java.text.ParseException;
/**
@@ -109,6 +106,10 @@
    }
    public CallIdHeader getNewCallIdHeader(String ip, String transport){
        if (ObjectUtils.isEmpty(ip) || ObjectUtils.isEmpty(transport)) {
            return  transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider().getNewCallId()
                    : sipLayer.getUdpSipProvider().getNewCallId();
        }
        return  transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider(ip).getNewCallId()
                : sipLayer.getUdpSipProvider(ip).getNewCallId();
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
@@ -1,14 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.callback;
import java.util.HashMap;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.async.DeferredResult;
/**    
 * @description: 异步请求处理
@@ -51,31 +52,48 @@
    public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
    private Map<String, Map<String, DeferredResult>> map = new ConcurrentHashMap<>();
    private Map<String, Map<String, DeferredResultEx>> map = new ConcurrentHashMap<>();
    public void put(String key, String id, DeferredResult result) {
        Map<String, DeferredResult> deferredResultMap = map.get(key);
    public void put(String key, String id, DeferredResultEx result) {
        Map<String, DeferredResultEx> deferredResultMap = map.get(key);
        if (deferredResultMap == null) {
            deferredResultMap = new ConcurrentHashMap<>();
            map.put(key, deferredResultMap);
        }
        deferredResultMap.put(id, result);
    }
    public DeferredResult get(String key, String id) {
        Map<String, DeferredResult> deferredResultMap = map.get(key);
    public void put(String key, String id, DeferredResult result) {
        Map<String, DeferredResultEx> deferredResultMap = map.get(key);
        if (deferredResultMap == null) {
            deferredResultMap = new ConcurrentHashMap<>();
            map.put(key, deferredResultMap);
        }
        deferredResultMap.put(id, new DeferredResultEx(result));
    }
    public DeferredResultEx get(String key, String id) {
        Map<String, DeferredResultEx> deferredResultMap = map.get(key);
        if (deferredResultMap == null || ObjectUtils.isEmpty(id)) {
            return null;
        }
        return deferredResultMap.get(id);
    }
    public Collection<DeferredResultEx> getAllByKey(String key) {
        Map<String, DeferredResultEx> deferredResultMap = map.get(key);
        if (deferredResultMap == null) {
            return null;
        }
        return deferredResultMap.values();
    }
    public boolean exist(String key, String id){
        if (key == null) {
            return false;
        }
        Map<String, DeferredResult> deferredResultMap = map.get(key);
        Map<String, DeferredResultEx> deferredResultMap = map.get(key);
        if (id == null) {
            return deferredResultMap != null;
        }else {
@@ -88,15 +106,15 @@
     * @param msg
     */
    public void invokeResult(RequestMessage msg) {
        Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
        Map<String, DeferredResultEx> deferredResultMap = map.get(msg.getKey());
        if (deferredResultMap == null) {
            return;
        }
        DeferredResult result = deferredResultMap.get(msg.getId());
        DeferredResultEx result = deferredResultMap.get(msg.getId());
        if (result == null) {
            return;
        }
        result.setResult(msg.getData());
        result.getDeferredResult().setResult(msg.getData());
        deferredResultMap.remove(msg.getId());
        if (deferredResultMap.size() == 0) {
            map.remove(msg.getKey());
@@ -108,18 +126,27 @@
     * @param msg
     */
    public void invokeAllResult(RequestMessage msg) {
        Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
        Map<String, DeferredResultEx> deferredResultMap = map.get(msg.getKey());
        if (deferredResultMap == null) {
            return;
        }
        Set<String> ids = deferredResultMap.keySet();
        for (String id : ids) {
            DeferredResult result = deferredResultMap.get(id);
            DeferredResultEx result = deferredResultMap.get(id);
            if (result == null) {
                return;
            }
            result.setResult(msg.getData());
            if (result.getFilter() != null) {
                Object handler = result.getFilter().handler(msg.getData());
                System.out.println(JSON.toJSONString(handler));
                result.getDeferredResult().setResult(handler);
            }else {
                result.getDeferredResult().setResult(msg.getData());
            }
        }
        map.remove(msg.getKey());
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -8,14 +8,13 @@
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -39,9 +38,10 @@
import org.springframework.stereotype.Component;
import javax.sdp.*;
import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
@@ -479,7 +479,7 @@
                            playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> {
                                logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
                                redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                            }, null);
                            });
                        } else {
                            sendRtpItem.setStreamId(playTransaction.getStream());
                            // 写入redis, 超时时回复
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -13,7 +13,6 @@
import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException;
@@ -25,12 +24,12 @@
 */
public interface IPlayService {
    void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
    void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId);
    void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
              ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
              InviteTimeOutCallback timeoutCallback, String uuid);
    PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
              InviteTimeOutCallback timeoutCallback);
    void play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
    MediaServerItem getNewMediaServerItem(Device device);
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,49 +1,28 @@
package com.genersoft.iot.vmp.service.impl;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.ParseException;
import java.util.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.web.context.request.async.DeferredResult;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -53,8 +32,27 @@
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.ParseException;
import java.util.List;
import java.util.UUID;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service
@@ -111,46 +109,19 @@
    private ThreadPoolTaskExecutor taskExecutor;
    @Override
    public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                           ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           Runnable timeoutCallback) {
    public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                                 ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                                 Runnable timeoutCallback) {
        if (mediaServerItem == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
        }
        PlayResult playResult = new PlayResult();
        RequestMessage msg = new RequestMessage();
        String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
        msg.setKey(key);
        String uuid = UUID.randomUUID().toString();
        msg.setId(uuid);
        playResult.setUuid(uuid);
        DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
        playResult.setResult(result);
        // 录像查询以channelId作为deviceId查询
        resultHolder.put(key, uuid, result);
        Device device = redisCatchStorage.getDevice(deviceId);
        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
        playResult.setDevice(device);
        result.onCompletion(() -> {
            // 点播结束时调用截图接口
            taskExecutor.execute(() -> {
                // TODO 应该在上流时调用更好,结束也可能是错误结束
                String path = "snap";
                String fileName = deviceId + "_" + channelId + ".jpg";
                WVPResult wvpResult = (WVPResult) result.getResult();
                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
                    StreamInfo streamInfoForSuccess = (StreamInfo) wvpResult.getData();
                    MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
                    String streamUrl = streamInfoForSuccess.getFmp4().getUrl();
                    // 请求截图
                    logger.info("[请求截图]: " + fileName);
                    zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
                }
            });
        });
        if (streamInfo != null) {
            String streamId = streamInfo.getStream();
@@ -160,7 +131,7 @@
                wvpResult.setMsg("点播失败, redis缓存streamId等于null");
                msg.setData(wvpResult);
                resultHolder.invokeAllResult(msg);
                return playResult;
                return;
            }
            String mediaServerId = streamInfo.getMediaServerId();
            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
@@ -178,14 +149,13 @@
                        msg.setData(wvpResult);
                        resultHolder.invokeAllResult(msg);
                        return playResult;
                        return;
                    } else {
                        WVPResult wvpResult = new WVPResult();
                        wvpResult.setCode(ErrorCode.SUCCESS.getCode());
                        wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
                        wvpResult.setData(streamInfo);
                        msg.setData(wvpResult);
                        resultHolder.invokeAllResult(msg);
                        if (hookEvent != null) {
                            hookEvent.response(mediaServerItem, JSON.parseObject(JSON.toJSONString(streamInfo)));
@@ -211,7 +181,6 @@
                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
            }
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
            logger.info(JSONObject.toJSONString(ssrcInfo));
            play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> {
                if (hookEvent != null) {
                    hookEvent.response(mediaServerItem, response);
@@ -238,16 +207,15 @@
                msg.setData(wvpResult);
                // 回复之前所有的点播请求
                resultHolder.invokeAllResult(msg);
            }, uuid);
            });
        }
        return playResult;
    }
    @Override
    public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
                     ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                     InviteTimeOutCallback timeoutCallback, String uuid) {
                     InviteTimeOutCallback timeoutCallback) {
        String streamId = null;
        if (mediaServerItem.isRtpEnable()) {
@@ -281,6 +249,16 @@
        //端口获取失败的ssrcInfo 没有必要发送点播指令
        if (ssrcInfo.getPort() <= 0) {
            logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
            dynamicTask.stop(timeOutTaskKey);
            // 释放ssrc
            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
            streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
            RequestMessage msg = new RequestMessage();
            msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);
            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "点播端口分配异常"));
            resultHolder.invokeAllResult(msg);
            return;
        }
        try {
@@ -289,9 +267,15 @@
                System.out.println("停止超时任务: " + timeOutTaskKey);
                dynamicTask.stop(timeOutTaskKey);
                // hook响应
                onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
                onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
                hookEvent.response(mediaServerItemInuse, response);
                logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp",  stream);
                String path = "snap";
                String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
                // 请求截图
                logger.info("[请求截图]: " + fileName);
                zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
            }, (event) -> {
                ResponseEvent responseEvent = (ResponseEvent) event.event;
@@ -331,7 +315,7 @@
                                logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                dynamicTask.stop(timeOutTaskKey);
                                // hook响应
                                onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
                                onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
                                hookEvent.response(mediaServerItemInUse, response);
                            });
                        }
@@ -367,13 +351,41 @@
    }
    @Override
    public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
    public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
        RequestMessage msg = new RequestMessage();
        if (uuid != null) {
        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
        if (streamInfo != null) {
            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
            if (deviceChannel != null) {
                deviceChannel.setStreamId(streamInfo.getStream());
                storager.startPlay(deviceId, channelId, streamInfo.getStream());
            }
            redisCatchStorage.startPlay(streamInfo);
            WVPResult wvpResult = new WVPResult();
            wvpResult.setCode(ErrorCode.SUCCESS.getCode());
            wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
            wvpResult.setData(streamInfo);
            msg.setData(wvpResult);
            resultHolder.invokeAllResult(msg);
        } else {
            logger.warn("设备预览API调用失败!");
            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
            resultHolder.invokeAllResult(msg);
        }
    }
    private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
        RequestMessage msg = new RequestMessage();
        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
        if (!ObjectUtils.isEmpty(uuid)) {
            msg.setId(uuid);
        }
        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
        if (streamInfo != null) {
            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
            if (deviceChannel != null) {
@@ -390,8 +402,8 @@
            resultHolder.invokeAllResult(msg);
        } else {
            logger.warn("设备预览API调用失败!");
            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
            logger.warn("录像回放调用失败!");
            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "录像回放调用失败!"));
            resultHolder.invokeAllResult(msg);
        }
    }
@@ -545,7 +557,7 @@
                                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                            dynamicTask.stop(playBackTimeOutTaskKey);
                                            // hook响应
                                            onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
                                            onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
                                            hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
                                        });
                                    }
@@ -568,6 +580,8 @@
        return result;
    }
    @Override
    public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
        Device device = storager.queryVideoDevice(deviceId);
src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java
@@ -1,14 +1,12 @@
package com.genersoft.iot.vmp.utils.redis;
import java.nio.charset.Charset;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONReader;
import com.alibaba.fastjson2.JSONWriter;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter.Feature;
import java.nio.charset.Charset;
/**    
 * @description:使用fastjson实现redis的序列化   
@@ -31,7 +29,7 @@
        if (t == null) {
            return new byte[0];
        }
        return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName).getBytes(DEFAULT_CHARSET);
        return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName, JSONWriter.Feature.WritePairAsJavaBean).getBytes(DEFAULT_CHARSET);
    }
 
    @Override
@@ -42,4 +40,6 @@
        String str = new String(bytes, DEFAULT_CHARSET);
        return JSON.parseObject(str, clazz, JSONReader.Feature.SupportAutoType);
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java
New file
@@ -0,0 +1,31 @@
package com.genersoft.iot.vmp.vmanager.bean;
import org.springframework.web.context.request.async.DeferredResult;
public class DeferredResultEx<T> {
    private DeferredResult<T> deferredResult;
    private DeferredResultFilter filter;
    public DeferredResultEx(DeferredResult<T> result) {
        this.deferredResult = result;
    }
    public DeferredResult<T> getDeferredResult() {
        return deferredResult;
    }
    public void setDeferredResult(DeferredResult<T> deferredResult) {
        this.deferredResult = deferredResult;
    }
    public DeferredResultFilter getFilter() {
        return filter;
    }
    public void setFilter(DeferredResultFilter filter) {
        this.filter = filter;
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java
New file
@@ -0,0 +1,6 @@
package com.genersoft.iot.vmp.vmanager.bean;
public interface DeferredResultFilter {
    Object handler(Object o);
}
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -1,44 +1,36 @@
package com.genersoft.iot.vmp.vmanager.gb28181.play;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
@@ -91,16 +83,52 @@
    public DeferredResult<WVPResult<StreamInfo>> play(HttpServletRequest request, @PathVariable String deviceId,
                                                      @PathVariable String channelId) {
        String localAddr = request.getLocalAddr();
        String localName = request.getLocalName();
        String remoteHost = request.getRemoteHost();
        String remoteAddr = request.getRemoteAddr();
        String remoteUser = request.getRemoteUser();
        String requestURI = request.getRequestURI();
        System.out.println(3333333);
        System.out.println(localAddr);
        System.out.println(localName);
        System.out.println(remoteHost);
        System.out.println(remoteAddr);
        System.out.println(remoteUser);
        System.out.println(requestURI);
        System.out.println(4444444);
        // 获取可用的zlm
        Device device = storager.queryVideoDevice(deviceId);
        MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
        PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
        playResult.getResult().onCompletion(()->{
            WVPResult<StreamInfo> result = (WVPResult<StreamInfo>)playResult.getResult().getResult();
            result.getData().channgeStreamIp(request.getLocalAddr());
            playResult.getResult().setResult(result);
        RequestMessage msg = new RequestMessage();
        String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
        boolean exist = resultHolder.exist(key, null);
        msg.setKey(key);
        String uuid = UUID.randomUUID().toString();
        msg.setId(uuid);
        DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
        DeferredResultEx<WVPResult<StreamInfo>> deferredResultEx = new DeferredResultEx<>(result);
        deferredResultEx.setFilter(result1 -> {
            System.out.println(1111);
            System.out.println(request.getLocalName());
            WVPResult<StreamInfo> wvpResult = (WVPResult<StreamInfo>)result1;
            if (wvpResult.getCode() == ErrorCode.SUCCESS.getCode()) {
                StreamInfo data = wvpResult.getData();
                data.channgeStreamIp(request.getLocalName());
                ((WVPResult<StreamInfo>)result1).setData(data);
            }
            return result1;
        });
        return playResult.getResult();
        // 录像查询以channelId作为deviceId查询
        resultHolder.put(key, uuid, deferredResultEx);
        if (!exist) {
            playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
        }
        return result;
    }
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
@@ -112,7 +112,7 @@
            return resultDeferredResult;
        }
        MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
        PlayResult play = playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{
        playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{
            StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code);
            JSONObject result = new JSONObject();
            result.put("StreamID", streamInfo.getStream());
src/main/resources/all-application.yml
@@ -105,8 +105,6 @@
    id: 44010200492000000001
    # [可选] 默认设备认证密码,后续扩展使用设备单独密码, 移除密码将不进行校验
    password: admin123
    # [可选] 心跳超时时间, 建议设置为心跳周期的三倍
    keepalive-timeout: 255
    # [可选] 国标级联注册失败,再次发起注册的时间间隔。 默认60秒
    register-time-interval: 60
    # [可选] 云台控制速度