src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/application.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java
New file @@ -0,0 +1,5 @@ package com.genersoft.iot.vmp.common; public interface GeneralCallback<T>{ void run(int code, String msg, T data); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
@@ -39,6 +39,8 @@ public static final String CALLBACK_CMD_DOWNLOAD = "CALLBACK_DOWNLOAD"; public static final String CALLBACK_CMD_PROXY = "CALLBACK_PROXY"; public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP"; public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL"; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -447,7 +447,7 @@ } subscribe.removeSubscribe(hookSubscribe); }); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { ResponseEvent responseEvent = (ResponseEvent) event.event; @@ -566,7 +566,7 @@ }); }); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); if (inviteStreamCallback != null) { inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,callId, "rtp", ssrcInfo.getStream())); } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -6,7 +6,9 @@ import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
@@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; @@ -13,7 +14,7 @@ * 保存视频代理 * @param param */ StreamInfo save(StreamProxyItem param); void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback); /** * 添加视频代理到zlm src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -2,12 +2,16 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -86,6 +90,9 @@ private IMediaServerService mediaServerService; @Autowired private ZlmHttpHookSubscribe hookSubscribe; @Autowired DataSourceTransactionManager dataSourceTransactionManager; @Autowired @@ -93,7 +100,7 @@ @Override public StreamInfo save(StreamProxyItem param) { public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) { MediaServerItem mediaInfo; if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); @@ -107,7 +114,6 @@ String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), param.getStream() ); param.setDst_url(dstUrl); StringBuffer resultMsg = new StringBuffer(); param.setMediaServerId(mediaInfo.getId()); boolean saveResult; // 更新 @@ -117,14 +123,25 @@ saveResult = addStreamProxy(param); } if (!saveResult) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败"); callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); return; } StreamInfo resultForStreamInfo = null; resultMsg.append("保存成功"); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }); if (param.isEnable()) { JSONObject jsonObject = addStreamProxyToZlm(param); if (jsonObject == null || jsonObject.getInteger("code") != 0) { resultMsg.append(", 但是启用失败,请检查流地址是否可用"); if (jsonObject != null && jsonObject.getInteger("code") == 0) { hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }else { param.setEnable(false); // 直接移除 if (param.isEnable_remove_none_reader()) { @@ -132,14 +149,15 @@ }else { updateStreamProxy(param); } }else { resultForStreamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); if (jsonObject == null){ callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); return; }else { callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); return; } } } return resultForStreamInfo; } /** @@ -233,6 +251,7 @@ result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(), param.getTimeout_ms() + "", param.isEnable_audio(), param.isEnable_mp4(), param.getFfmpeg_cmd_key()); System.out.println(result); } return result; } src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
@@ -1,13 +1,18 @@ package com.genersoft.iot.vmp.vmanager.streamProxy; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -18,6 +23,9 @@ import org.springframework.stereotype.Controller; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import java.util.UUID; @SuppressWarnings("rawtypes") /** @@ -36,6 +44,12 @@ @Autowired private IStreamProxyService streamProxyService; @Autowired private DeferredResultHolder resultHolder; @Autowired private UserSetting userSetting; @Operation(summary = "分页查询流代理") @@ -58,7 +72,7 @@ }) @PostMapping(value = "/save") @ResponseBody public StreamContent save(@RequestBody StreamProxyItem param){ public DeferredResult<Object> save(@RequestBody StreamProxyItem param){ logger.info("添加代理: " + JSONObject.toJSONString(param)); if (ObjectUtils.isEmpty(param.getMediaServerId())) { param.setMediaServerId("auto"); @@ -69,7 +83,33 @@ if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbId(null); } return new StreamContent(streamProxyService.save(param)); RequestMessage requestMessage = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream(); requestMessage.setKey(key); String uuid = UUID.randomUUID().toString(); requestMessage.setId(uuid); DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); // 录像查询以channelId作为deviceId查询 resultHolder.put(key, uuid, result); result.onTimeout(()->{ WVPResult<StreamInfo> wvpResult = new WVPResult<>(); wvpResult.setCode(ErrorCode.ERROR100.getCode()); wvpResult.setMsg("超时"); requestMessage.setData(wvpResult); resultHolder.invokeAllResult(requestMessage); }); streamProxyService.save(param, (code, msg, streamInfo) -> { logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); if (code == ErrorCode.SUCCESS.getCode()) { requestMessage.setData(new StreamContent(streamInfo)); }else { requestMessage.setData(WVPResult.fail(code, msg)); } resultHolder.invokeAllResult(requestMessage); }); return result; } @GetMapping(value = "/ffmpeg_cmd/list") src/main/resources/application.yml
@@ -2,4 +2,4 @@ application: name: wvp profiles: active: local active: local268