src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
@@ -2,10 +2,10 @@ import io.swagger.v3.oas.annotations.media.Schema; import java.net.URL; import java.io.Serializable; @Schema(description = "流信息") public class StreamInfo { public class StreamInfo implements Serializable { @Schema(description = "应用名") private String app; @@ -79,6 +79,94 @@ @Schema(description = "是否暂停(录像回放使用)") private boolean pause; public void setFlv(StreamURL flv) { this.flv = flv; } public void setHttps_flv(StreamURL https_flv) { this.https_flv = https_flv; } public void setWs_flv(StreamURL ws_flv) { this.ws_flv = ws_flv; } public void setWss_flv(StreamURL wss_flv) { this.wss_flv = wss_flv; } public void setFmp4(StreamURL fmp4) { this.fmp4 = fmp4; } public void setHttps_fmp4(StreamURL https_fmp4) { this.https_fmp4 = https_fmp4; } public void setWs_fmp4(StreamURL ws_fmp4) { this.ws_fmp4 = ws_fmp4; } public void setWss_fmp4(StreamURL wss_fmp4) { this.wss_fmp4 = wss_fmp4; } public void setHls(StreamURL hls) { this.hls = hls; } public void setHttps_hls(StreamURL https_hls) { this.https_hls = https_hls; } public void setWs_hls(StreamURL ws_hls) { this.ws_hls = ws_hls; } public void setWss_hls(StreamURL wss_hls) { this.wss_hls = wss_hls; } public void setTs(StreamURL ts) { this.ts = ts; } public void setHttps_ts(StreamURL https_ts) { this.https_ts = https_ts; } public void setWs_ts(StreamURL ws_ts) { this.ws_ts = ws_ts; } public void setWss_ts(StreamURL wss_ts) { this.wss_ts = wss_ts; } public void setRtmp(StreamURL rtmp) { this.rtmp = rtmp; } public void setRtmps(StreamURL rtmps) { this.rtmps = rtmps; } public void setRtsp(StreamURL rtsp) { this.rtsp = rtsp; } public void setRtsps(StreamURL rtsps) { this.rtsps = rtsps; } public void setRtc(StreamURL rtc) { this.rtc = rtc; } public void setRtcs(StreamURL rtcs) { this.rtcs = rtcs; } public void setRtmp(String host, int port, int sslPort, String app, String stream, String callIdParam) { String file = String.format("%s/%s/%s", app, stream, callIdParam); this.rtmp = new StreamURL("rtmp", host, port, file); src/main/java/com/genersoft/iot/vmp/common/StreamURL.java
@@ -2,9 +2,11 @@ import io.swagger.v3.oas.annotations.media.Schema; import java.io.Serializable; @Schema(description = "流地址信息") public class StreamURL { public class StreamURL implements Serializable { @Schema(description = "协议") private String protocol; src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java
@@ -20,8 +20,6 @@ Integer ptzSpeed = 50; Integer keepaliveTimeOut = 255; Integer registerTimeInterval = 120; private boolean alarm; @@ -50,9 +48,6 @@ this.ptzSpeed = ptzSpeed; } public void setKeepaliveTimeOut(Integer keepaliveTimeOut) { this.keepaliveTimeOut = keepaliveTimeOut; } public void setRegisterTimeInterval(Integer registerTimeInterval) { this.registerTimeInterval = registerTimeInterval; @@ -84,10 +79,6 @@ public Integer getPtzSpeed() { return ptzSpeed; } public Integer getKeepaliveTimeOut() { return keepaliveTimeOut; } public Integer getRegisterTimeInterval() { src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -11,6 +11,7 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.*; import java.util.*; @@ -107,6 +108,9 @@ } public SipProviderImpl getUdpSipProvider(String ip) { if (ObjectUtils.isEmpty(ip)) { return null; } return udpSipProviderMap.get(ip); } @@ -125,6 +129,9 @@ } public SipProviderImpl getTcpSipProvider(String ip) { if (ObjectUtils.isEmpty(ip)) { return null; } return tcpSipProviderMap.get(ip); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java
@@ -5,22 +5,19 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.SipException; import javax.sip.SipFactory; import javax.sip.header.CallIdHeader; import javax.sip.header.UserAgentHeader; import javax.sip.header.ViaHeader; import javax.sip.message.Message; import javax.sip.message.Request; import javax.sip.message.Response; import java.net.InetAddress; import java.text.ParseException; /** @@ -109,6 +106,10 @@ } public CallIdHeader getNewCallIdHeader(String ip, String transport){ if (ObjectUtils.isEmpty(ip) || ObjectUtils.isEmpty(transport)) { return transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider().getNewCallId() : sipLayer.getUdpSipProvider().getNewCallId(); } return transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider(ip).getNewCallId() : sipLayer.getUdpSipProvider(ip).getNewCallId(); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
@@ -1,14 +1,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.callback; import java.util.HashMap; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.web.context.request.async.DeferredResult; import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.context.request.async.DeferredResult; /** * @description: 异步请求处理 @@ -51,31 +52,48 @@ public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST"; private Map<String, Map<String, DeferredResult>> map = new ConcurrentHashMap<>(); private Map<String, Map<String, DeferredResultEx>> map = new ConcurrentHashMap<>(); public void put(String key, String id, DeferredResult result) { Map<String, DeferredResult> deferredResultMap = map.get(key); public void put(String key, String id, DeferredResultEx result) { Map<String, DeferredResultEx> deferredResultMap = map.get(key); if (deferredResultMap == null) { deferredResultMap = new ConcurrentHashMap<>(); map.put(key, deferredResultMap); } deferredResultMap.put(id, result); } public DeferredResult get(String key, String id) { Map<String, DeferredResult> deferredResultMap = map.get(key); public void put(String key, String id, DeferredResult result) { Map<String, DeferredResultEx> deferredResultMap = map.get(key); if (deferredResultMap == null) { deferredResultMap = new ConcurrentHashMap<>(); map.put(key, deferredResultMap); } deferredResultMap.put(id, new DeferredResultEx(result)); } public DeferredResultEx get(String key, String id) { Map<String, DeferredResultEx> deferredResultMap = map.get(key); if (deferredResultMap == null || ObjectUtils.isEmpty(id)) { return null; } return deferredResultMap.get(id); } public Collection<DeferredResultEx> getAllByKey(String key) { Map<String, DeferredResultEx> deferredResultMap = map.get(key); if (deferredResultMap == null) { return null; } return deferredResultMap.values(); } public boolean exist(String key, String id){ if (key == null) { return false; } Map<String, DeferredResult> deferredResultMap = map.get(key); Map<String, DeferredResultEx> deferredResultMap = map.get(key); if (id == null) { return deferredResultMap != null; }else { @@ -88,15 +106,15 @@ * @param msg */ public void invokeResult(RequestMessage msg) { Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey()); Map<String, DeferredResultEx> deferredResultMap = map.get(msg.getKey()); if (deferredResultMap == null) { return; } DeferredResult result = deferredResultMap.get(msg.getId()); DeferredResultEx result = deferredResultMap.get(msg.getId()); if (result == null) { return; } result.setResult(msg.getData()); result.getDeferredResult().setResult(msg.getData()); deferredResultMap.remove(msg.getId()); if (deferredResultMap.size() == 0) { map.remove(msg.getKey()); @@ -108,18 +126,27 @@ * @param msg */ public void invokeAllResult(RequestMessage msg) { Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey()); Map<String, DeferredResultEx> deferredResultMap = map.get(msg.getKey()); if (deferredResultMap == null) { return; } Set<String> ids = deferredResultMap.keySet(); for (String id : ids) { DeferredResult result = deferredResultMap.get(id); DeferredResultEx result = deferredResultMap.get(id); if (result == null) { return; } result.setResult(msg.getData()); if (result.getFilter() != null) { Object handler = result.getFilter().handler(msg.getData()); System.out.println(JSON.toJSONString(handler)); result.getDeferredResult().setResult(handler); }else { result.getDeferredResult().setResult(msg.getData()); } } map.remove(msg.getKey()); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -8,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; @@ -39,9 +38,10 @@ import org.springframework.stereotype.Component; import javax.sdp.*; import javax.sip.*; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; @@ -479,7 +479,7 @@ playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); }, null); }); } else { sendRtpItem.setStreamId(playTransaction.getStream()); // 写入redis, 超时时回复 src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -13,7 +13,6 @@ import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; @@ -25,12 +24,12 @@ */ public interface IPlayService { void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId); void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback, String uuid); PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); InviteTimeOutCallback timeoutCallback); void play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); MediaServerItem getNewMediaServerItem(Device device); src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,49 +1,28 @@ package com.genersoft.iot.vmp.service.impl; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; import java.util.*; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; import org.springframework.web.context.request.async.DeferredResult; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; @@ -53,8 +32,27 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; import java.util.List; import java.util.UUID; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -111,46 +109,19 @@ private ThreadPoolTaskExecutor taskExecutor; @Override public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, Runnable timeoutCallback) { public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, Runnable timeoutCallback) { if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } PlayResult playResult = new PlayResult(); RequestMessage msg = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; msg.setKey(key); String uuid = UUID.randomUUID().toString(); msg.setId(uuid); playResult.setUuid(uuid); DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); playResult.setResult(result); // 录像查询以channelId作为deviceId查询 resultHolder.put(key, uuid, result); Device device = redisCatchStorage.getDevice(deviceId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); result.onCompletion(() -> { // 点播结束时调用截图接口 taskExecutor.execute(() -> { // TODO 应该在上流时调用更好,结束也可能是错误结束 String path = "snap"; String fileName = deviceId + "_" + channelId + ".jpg"; WVPResult wvpResult = (WVPResult) result.getResult(); if (Objects.requireNonNull(wvpResult).getCode() == 0) { StreamInfo streamInfoForSuccess = (StreamInfo) wvpResult.getData(); MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); String streamUrl = streamInfoForSuccess.getFmp4().getUrl(); // 请求截图 logger.info("[请求截图]: " + fileName); zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName); } }); }); if (streamInfo != null) { String streamId = streamInfo.getStream(); @@ -160,7 +131,7 @@ wvpResult.setMsg("点播失败, redis缓存streamId等于null"); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); return playResult; return; } String mediaServerId = streamInfo.getMediaServerId(); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); @@ -178,14 +149,13 @@ msg.setData(wvpResult); resultHolder.invokeAllResult(msg); return playResult; return; } else { WVPResult wvpResult = new WVPResult(); wvpResult.setCode(ErrorCode.SUCCESS.getCode()); wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); wvpResult.setData(streamInfo); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); if (hookEvent != null) { hookEvent.response(mediaServerItem, JSON.parseObject(JSON.toJSONString(streamInfo))); @@ -211,7 +181,6 @@ streamId = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> { if (hookEvent != null) { hookEvent.response(mediaServerItem, response); @@ -238,16 +207,15 @@ msg.setData(wvpResult); // 回复之前所有的点播请求 resultHolder.invokeAllResult(msg); }, uuid); }); } return playResult; } @Override public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback, String uuid) { InviteTimeOutCallback timeoutCallback) { String streamId = null; if (mediaServerItem.isRtpEnable()) { @@ -281,6 +249,16 @@ //端口获取失败的ssrcInfo 没有必要发送点播指令 if (ssrcInfo.getPort() <= 0) { logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); dynamicTask.stop(timeOutTaskKey); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); RequestMessage msg = new RequestMessage(); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId); msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "点播端口分配异常")); resultHolder.invokeAllResult(msg); return; } try { @@ -289,9 +267,15 @@ System.out.println("停止超时任务: " + timeOutTaskKey); dynamicTask.stop(timeOutTaskKey); // hook响应 onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid); onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId); hookEvent.response(mediaServerItemInuse, response); logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream); String path = "snap"; String fileName = device.getDeviceId() + "_" + channelId + ".jpg"; // 请求截图 logger.info("[请求截图]: " + fileName); zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); }, (event) -> { ResponseEvent responseEvent = (ResponseEvent) event.event; @@ -331,7 +315,7 @@ logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); dynamicTask.stop(timeOutTaskKey); // hook响应 onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId); hookEvent.response(mediaServerItemInUse, response); }); } @@ -367,13 +351,41 @@ } @Override public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) { StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); RequestMessage msg = new RequestMessage(); if (uuid != null) { msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { deviceChannel.setStreamId(streamInfo.getStream()); storager.startPlay(deviceId, channelId, streamInfo.getStream()); } redisCatchStorage.startPlay(streamInfo); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(ErrorCode.SUCCESS.getCode()); wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); wvpResult.setData(streamInfo); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); } else { logger.warn("设备预览API调用失败!"); msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!")); resultHolder.invokeAllResult(msg); } } private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); if (!ObjectUtils.isEmpty(uuid)) { msg.setId(uuid); } msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -390,8 +402,8 @@ resultHolder.invokeAllResult(msg); } else { logger.warn("设备预览API调用失败!"); msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!")); logger.warn("录像回放调用失败!"); msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "录像回放调用失败!")); resultHolder.invokeAllResult(msg); } } @@ -545,7 +557,7 @@ logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); dynamicTask.stop(playBackTimeOutTaskKey); // hook响应 onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); }); } @@ -568,6 +580,8 @@ return result; } @Override public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) { Device device = storager.queryVideoDevice(deviceId); src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java
@@ -1,14 +1,12 @@ package com.genersoft.iot.vmp.utils.redis; import java.nio.charset.Charset; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONReader; import com.alibaba.fastjson2.JSONWriter; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.SerializationException; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONWriter.Feature; import java.nio.charset.Charset; /** * @description:使用fastjson实现redis的序列化 @@ -31,7 +29,7 @@ if (t == null) { return new byte[0]; } return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName).getBytes(DEFAULT_CHARSET); return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName, JSONWriter.Feature.WritePairAsJavaBean).getBytes(DEFAULT_CHARSET); } @Override @@ -42,4 +40,6 @@ String str = new String(bytes, DEFAULT_CHARSET); return JSON.parseObject(str, clazz, JSONReader.Feature.SupportAutoType); } } src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java
New file @@ -0,0 +1,31 @@ package com.genersoft.iot.vmp.vmanager.bean; import org.springframework.web.context.request.async.DeferredResult; public class DeferredResultEx<T> { private DeferredResult<T> deferredResult; private DeferredResultFilter filter; public DeferredResultEx(DeferredResult<T> result) { this.deferredResult = result; } public DeferredResult<T> getDeferredResult() { return deferredResult; } public void setDeferredResult(DeferredResult<T> deferredResult) { this.deferredResult = deferredResult; } public DeferredResultFilter getFilter() { return filter; } public void setFilter(DeferredResultFilter filter) { this.filter = filter; } } src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java
New file @@ -0,0 +1,6 @@ package com.genersoft.iot.vmp.vmanager.bean; public interface DeferredResultFilter { Object handler(Object o); } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -1,44 +1,36 @@ package com.genersoft.iot.vmp.vmanager.gb28181.play; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -91,16 +83,52 @@ public DeferredResult<WVPResult<StreamInfo>> play(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId) { String localAddr = request.getLocalAddr(); String localName = request.getLocalName(); String remoteHost = request.getRemoteHost(); String remoteAddr = request.getRemoteAddr(); String remoteUser = request.getRemoteUser(); String requestURI = request.getRequestURI(); System.out.println(3333333); System.out.println(localAddr); System.out.println(localName); System.out.println(remoteHost); System.out.println(remoteAddr); System.out.println(remoteUser); System.out.println(requestURI); System.out.println(4444444); // 获取可用的zlm Device device = storager.queryVideoDevice(deviceId); MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null); playResult.getResult().onCompletion(()->{ WVPResult<StreamInfo> result = (WVPResult<StreamInfo>)playResult.getResult().getResult(); result.getData().channgeStreamIp(request.getLocalAddr()); playResult.getResult().setResult(result); RequestMessage msg = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; boolean exist = resultHolder.exist(key, null); msg.setKey(key); String uuid = UUID.randomUUID().toString(); msg.setId(uuid); DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); DeferredResultEx<WVPResult<StreamInfo>> deferredResultEx = new DeferredResultEx<>(result); deferredResultEx.setFilter(result1 -> { System.out.println(1111); System.out.println(request.getLocalName()); WVPResult<StreamInfo> wvpResult = (WVPResult<StreamInfo>)result1; if (wvpResult.getCode() == ErrorCode.SUCCESS.getCode()) { StreamInfo data = wvpResult.getData(); data.channgeStreamIp(request.getLocalName()); ((WVPResult<StreamInfo>)result1).setData(data); } return result1; }); return playResult.getResult(); // 录像查询以channelId作为deviceId查询 resultHolder.put(key, uuid, deferredResultEx); if (!exist) { playService.play(newMediaServerItem, deviceId, channelId, null, null, null); } return result; } src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
@@ -112,7 +112,7 @@ return resultDeferredResult; } MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); PlayResult play = playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{ playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{ StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code); JSONObject result = new JSONObject(); result.put("StreamID", streamInfo.getStream()); src/main/resources/all-application.yml
@@ -105,8 +105,6 @@ id: 44010200492000000001 # [可选] 默认设备认证密码,后续扩展使用设备单独密码, 移除密码将不进行校验 password: admin123 # [可选] 心跳超时时间, 建议设置为心跳周期的三倍 keepalive-timeout: 255 # [可选] 国标级联注册失败,再次发起注册的时间间隔。 默认60秒 register-time-interval: 60 # [可选] 云台控制速度