src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
@@ -109,12 +109,18 @@ for (String deviceId : keys) { CatalogData catalogData = data.get(deviceId); if ( catalogData.getLastTime().isBefore(instantBefore5S)) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 if ( catalogData.getLastTime().isBefore(instantBefore5S)) { // 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { if (catalogData.getTotal() == catalogData.getChannelList().size()) { storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); if (catalogData.getTotal() != catalogData.getChannelList().size()) { }else { storager.updateChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); } String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; catalogData.setErrorMsg(errorMsg); if (catalogData.getTotal() != catalogData.getChannelList().size()) { } }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { String errorMsg = "同步失败,等待回复超时"; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -3,7 +3,6 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import org.apache.commons.lang3.ArrayUtils; @@ -14,19 +13,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import javax.sip.*; import javax.sip.address.Address; import javax.sip.address.AddressFactory; import javax.sip.address.SipURI; import javax.sip.header.*; import javax.sip.header.ContentTypeHeader; import javax.sip.header.ExpiresHeader; import javax.sip.header.HeaderFactory; import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; import java.io.ByteArrayInputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; @@ -43,15 +40,6 @@ @Autowired private SIPSender sipSender; public AddressFactory getAddressFactory() { try { return SipFactory.getInstance().createAddressFactory(); } catch (PeerUnavailableException e) { e.printStackTrace(); } return null; } public HeaderFactory getHeaderFactory() { try { @@ -92,53 +80,6 @@ public SIPResponse responseAck(SIPRequest sipRequest, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException { return responseAck(sipRequest, statusCode, msg, null); } // public SIPResponse responseAck(ServerTransaction serverTransaction, int statusCode, String msg, ResponseAckExtraParam responseAckExtraParam) throws SipException, InvalidArgumentException, ParseException { // if (serverTransaction == null) { // logger.warn("[回复消息] ServerTransaction 为null"); // return null; // } // ToHeader toHeader = (ToHeader) serverTransaction.getRequest().getHeader(ToHeader.NAME); // if (toHeader.getTag() == null) { // toHeader.setTag(SipUtils.getNewTag()); // } // SIPResponse response = (SIPResponse)getMessageFactory().createResponse(statusCode, serverTransaction.getRequest()); // if (msg != null) { // response.setReasonPhrase(msg); // } // if (responseAckExtraParam != null) { // if (responseAckExtraParam.sipURI != null && serverTransaction.getRequest().getMethod().equals(Request.INVITE)) { // logger.debug("responseSdpAck SipURI: {}:{}", responseAckExtraParam.sipURI.getHost(), responseAckExtraParam.sipURI.getPort()); // Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress( // SipFactory.getInstance().createAddressFactory().createSipURI(responseAckExtraParam.sipURI.getUser(), responseAckExtraParam.sipURI.getHost()+":"+responseAckExtraParam.sipURI.getPort() // )); // response.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress)); // } // if (responseAckExtraParam.contentTypeHeader != null) { // response.setContent(responseAckExtraParam.content, responseAckExtraParam.contentTypeHeader); // } // // if (serverTransaction.getRequest().getMethod().equals(Request.SUBSCRIBE)) { // if (responseAckExtraParam.expires == -1) { // logger.error("[参数不全] 2xx的SUBSCRIBE回复,必须设置Expires header"); // }else { // ExpiresHeader expiresHeader = SipFactory.getInstance().createHeaderFactory().createExpiresHeader(responseAckExtraParam.expires); // response.addHeader(expiresHeader); // } // } // }else { // if (serverTransaction.getRequest().getMethod().equals(Request.SUBSCRIBE)) { // logger.error("[参数不全] 2xx的SUBSCRIBE回复,必须设置Expires header"); // } // } // serverTransaction.sendResponse(response); // if (statusCode >= 200 && !"NOTIFY".equalsIgnoreCase(serverTransaction.getRequest().getMethod())) { // if (serverTransaction.getDialog() != null) { // serverTransaction.getDialog().delete(); // } // } // return response; // } public SIPResponse responseAck(SIPRequest sipRequest, int statusCode, String msg, ResponseAckExtraParam responseAckExtraParam) throws SipException, InvalidArgumentException, ParseException { if (sipRequest.getToHeader().getTag() == null) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -228,7 +228,7 @@ } return; } else { logger.info("通道不存在,返回404"); logger.info("通道不存在,返回404: {}", channelId); try { // 通道不存在,发404,资源不存在 responseAck(request, Response.NOT_FOUND); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.dto.HookType; @@ -18,7 +20,10 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -26,6 +31,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.servlet.http.HttpServletRequest; import javax.sip.InvalidArgumentException; @@ -34,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; /** * @description:针对 ZLMediaServer的hook事件监听 @@ -71,7 +78,7 @@ private IStreamProxyService streamProxyService; @Autowired private IStreamPushService streamPushService; private DeferredResultHolder resultHolder; @Autowired private IMediaService mediaService; @@ -103,11 +110,10 @@ /** * 服务器定时上报时间,上报间隔可配置,默认10s上报一次 * */ @ResponseBody @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public JSONObject onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param){ public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { logger.info("[ZLM HOOK] 收到zlm心跳:" + param.getMediaServerId()); @@ -122,20 +128,15 @@ }); mediaServerService.updateMediaServerKeepalive(param.getMediaServerId(), param.getData()); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); return ret; return HookResult.SUCCESS(); } /** * 播放器鉴权事件,rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。 * */ @ResponseBody @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8") public JSONObject onPlay(@RequestBody OnPlayHookParam param){ public HookResult onPlay(@RequestBody OnPlayHookParam param) { if (logger.isDebugEnabled()) { logger.debug("[ZLM HOOK] 播放鉴权:{}->{}" + param.getMediaServerId(), param); } @@ -151,34 +152,28 @@ } } }); JSONObject ret = new JSONObject(); if (!"rtp".equals(param.getApp())) { Map<String, String> paramMap = urlParamToMap(param.getParams()); StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); if (streamAuthorityInfo != null && streamAuthorityInfo.getCallId() != null && !streamAuthorityInfo.getCallId().equals(paramMap.get("callId"))) { ret.put("code", 401); ret.put("msg", "Unauthorized"); return ret; return new HookResult(401, "Unauthorized"); } } ret.put("code", 0); ret.put("msg", "success"); return ret; return HookResult.SUCCESS(); } /** * rtsp/rtmp/rtp推流鉴权事件。 * */ @ResponseBody @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8") public JSONObject onPublish(@RequestBody OnPublishHookParam param) { public HookResultForOnPublish onPublish(@RequestBody OnPublishHookParam param) { JSONObject json = (JSONObject) JSON.toJSON(param); logger.info("[ZLM HOOK]推流鉴权:{}->{}", param.getMediaServerId(), param); JSONObject ret = new JSONObject(); String mediaServerId = json.getString("mediaServerId"); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); @@ -187,17 +182,13 @@ // 推流鉴权 if (param.getParams() == null) { logger.info("推流鉴权失败: 缺少不要参数:sign=md5(user表的pushKey)"); ret.put("code", 401); ret.put("msg", "Unauthorized"); return ret; return new HookResultForOnPublish(401, "Unauthorized"); } Map<String, String> paramMap = urlParamToMap(param.getParams()); String sign = paramMap.get("sign"); if (sign == null) { logger.info("推流鉴权失败: 缺少不要参数:sign=md5(user表的pushKey)"); ret.put("code", 401); ret.put("msg", "Unauthorized"); return ret; return new HookResultForOnPublish(401, "Unauthorized"); } // 推流自定义播放鉴权码 String callId = paramMap.get("callId"); @@ -205,9 +196,7 @@ boolean hasAuthority = userService.checkPushAuthority(callId, sign); if (!hasAuthority) { logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign); ret.put("code", 401); ret.put("msg", "Unauthorized"); return ret; return new HookResultForOnPublish(401, "Unauthorized"); } StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); streamAuthorityInfo.setCallId(callId); @@ -225,11 +214,10 @@ zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId()); } ret.put("code", 0); ret.put("msg", "success"); HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); if (!"rtp".equals(param.getApp())) { ret.put("enable_audio", true); result.setEnable_audio(true); } taskExecutor.execute(()->{ @@ -238,16 +226,15 @@ if (mediaInfo != null) { subscribe.response(mediaInfo, json); }else { ret.put("code", 1); ret.put("msg", "zlm not register"); new HookResultForOnPublish(1, "zlm not register"); } } }); if ("rtp".equals(param.getApp())) { ret.put("enable_mp4", userSetting.getRecordSip()); result.setEnable_mp4(userSetting.getRecordSip()); }else { ret.put("enable_mp4", userSetting.isRecordPushLive()); result.setEnable_mp4(userSetting.isRecordPushLive()); } List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream()); if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { @@ -255,25 +242,24 @@ String channelId = ssrcTransactionForAll.get(0).getChannelId(); DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { ret.put("enable_audio", deviceChannel.isHasAudio()); result.setEnable_audio(deviceChannel.isHasAudio()); } // 如果是录像下载就设置视频间隔十秒 if (ssrcTransactionForAll.get(0).getType() == VideoStreamSessionManager.SessionType.download) { ret.put("mp4_max_second", 10); ret.put("enable_mp4", true); ret.put("enable_audio", true); result.setMp4_max_second(10); result.setEnable_audio(true); result.setEnable_mp4(true); } } return ret; return result; } /** * rtsp/rtmp流注册或注销时触发此事件;此事件对回复不敏感。 * */ @ResponseBody @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8") public JSONObject onStreamChanged(@RequestBody OnStreamChangedHookParam param){ public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) { if (param.isRegist()) { logger.info("[ZLM HOOK] 流注册, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); @@ -397,7 +383,8 @@ }else { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); } } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } } @@ -407,15 +394,11 @@ } }); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); return ret; return HookResult.SUCCESS(); } /** * 流无人观看时事件,用户可以通过此事件选择是否关闭无人看的流。 * */ @ResponseBody @PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8") @@ -452,7 +435,8 @@ try { cmder.streamByeCmd(device, streamInfoForPlayCatch.getChannelId(), streamInfoForPlayCatch.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage()); } } @@ -523,51 +507,91 @@ /** * 流未找到事件,用户可以在此事件触发时,立即去拉流,这样可以实现按需拉流;此事件对回复不敏感。 * */ @ResponseBody @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8") public JSONObject onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param){ public DeferredResult<HookResult> onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param) { logger.info("[ZLM HOOK] 流未找到:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); taskExecutor.execute(()->{ DeferredResult<HookResult> defaultResult = new DeferredResult<>(); MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); if (userSetting.isAutoApplyPlay() && mediaInfo != null) { if (!userSetting.isAutoApplyPlay() || mediaInfo == null) { defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); return defaultResult; } if ("rtp".equals(param.getApp())) { if (mediaInfo.isRtpEnable()) { String[] s = param.getStream().split("_"); if (s.length == 2) { if (!mediaInfo.isRtpEnable() || s.length != 2) { defaultResult.setResult(HookResult.SUCCESS()); return defaultResult; } String deviceId = s[0]; String channelId = s[1]; Device device = redisCatchStorage.getDevice(deviceId); if (device != null) { playService.play(mediaInfo,deviceId, channelId, null, null, null); if (device == null) { defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); return defaultResult; } DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel == null) { defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); return defaultResult; } logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); RequestMessage msg = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; boolean exist = resultHolder.exist(key, null); msg.setKey(key); String uuid = UUID.randomUUID().toString(); msg.setId(uuid); DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); DeferredResultEx<HookResult> deferredResultEx = new DeferredResultEx<>(result); result.onTimeout(() -> { logger.info("点播接口等待超时"); // 释放rtpserver msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); resultHolder.invokeResult(msg); }); // TODO 在点播未成功的情况下在此调用接口点播会导致返回的流地址ip错误 deferredResultEx.setFilter(result1 -> { WVPResult<StreamInfo> wvpResult1 = (WVPResult<StreamInfo>) result1; HookResult resultForEnd = new HookResult(); resultForEnd.setCode(wvpResult1.getCode()); resultForEnd.setMsg(wvpResult1.getMsg()); return resultForEnd; }); // 录像查询以channelId作为deviceId查询 resultHolder.put(key, uuid, deferredResultEx); if (!exist) { playService.play(mediaInfo, deviceId, channelId, null, eventResult -> { msg.setData(new HookResult(eventResult.statusCode, eventResult.msg)); resultHolder.invokeResult(msg); }, null); } return result; }else { // 拉流代理 StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnable_disable_none_reader()) { streamProxyService.start(param.getApp(), param.getStream()); } DeferredResult<HookResult> result = new DeferredResult<>(); result.setResult(HookResult.SUCCESS()); return result; } } }); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); return ret; } /** * 服务器启动事件,可以用于监听服务器崩溃重启;此事件对回复不敏感。 * */ @ResponseBody @PostMapping(value = "/on_server_started", produces = "application/json;charset=UTF-8") public JSONObject onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){ public HookResult onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject) { jsonObject.put("ip", request.getRemoteAddr()); ZLMServerConfig zlmServerConfig = JSON.to(ZLMServerConfig.class, jsonObject); @@ -583,10 +607,7 @@ mediaServerService.zlmServerOnline(zlmServerConfig); }); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); return ret; return HookResult.SUCCESS(); } /** @@ -594,17 +615,13 @@ */ @ResponseBody @PostMapping(value = "/on_send_rtp_stopped", produces = "application/json;charset=UTF-8") public JSONObject onSendRtpStopped(HttpServletRequest request, @RequestBody OnSendRtpStoppedHookParam param){ public HookResult onSendRtpStopped(HttpServletRequest request, @RequestBody OnSendRtpStoppedHookParam param) { logger.info("[ZLM HOOK] rtp发送关闭:{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); // 查找对应的上级推流,发送停止 if (!"rtp".equals(param.getApp())) { return ret; return HookResult.SUCCESS(); } taskExecutor.execute(()->{ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); @@ -622,8 +639,7 @@ } }); return ret; return HookResult.SUCCESS(); } /** @@ -631,12 +647,8 @@ */ @ResponseBody @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8") public JSONObject onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam param){ public HookResult onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam param) { logger.info("[ZLM HOOK] rtpServer收流超时:{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc()); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); taskExecutor.execute(()->{ JSONObject json = (JSONObject) JSON.toJSON(param); @@ -648,7 +660,7 @@ } }); return ret; return HookResult.SUCCESS(); } private Map<String, String> urlParamToMap(String params) { src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java
New file @@ -0,0 +1,36 @@ package com.genersoft.iot.vmp.media.zlm.dto.hook; public class HookResult { private int code; private String msg; public HookResult() { } public HookResult(int code, String msg) { this.code = code; this.msg = msg; } public static HookResult SUCCESS(){ return new HookResult(0, "success"); } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } } src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java
New file @@ -0,0 +1,44 @@ package com.genersoft.iot.vmp.media.zlm.dto.hook; public class HookResultForOnPublish extends HookResult{ private boolean enable_audio; private boolean enable_mp4; private int mp4_max_second; public HookResultForOnPublish() { } public static HookResultForOnPublish SUCCESS(){ return new HookResultForOnPublish(0, "success"); } public HookResultForOnPublish(int code, String msg) { setCode(code); setMsg(msg); } public boolean isEnable_audio() { return enable_audio; } public void setEnable_audio(boolean enable_audio) { this.enable_audio = enable_audio; } public boolean isEnable_mp4() { return enable_mp4; } public void setEnable_mp4(boolean enable_mp4) { this.enable_mp4 = enable_mp4; } public int getMp4_max_second() { return mp4_max_second; } public void setMp4_max_second(int mp4_max_second) { this.mp4_max_second = mp4_max_second; } } src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
@@ -152,6 +152,10 @@ @Override public void sendCatalogMsg(GbStream gbStream, String type) { if (gbStream == null || type == null) { logger.warn("[发送目录订阅]类型:流信息或类型为NULL"); return; } List<GbStream> gbStreams = new ArrayList<>(); if (gbStream.getGbId() != null) { gbStreams.add(gbStream); src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -184,7 +184,9 @@ @Override public boolean stop(String app, String streamId) { StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); if (streamPushItem != null) { gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); } platformGbStreamMapper.delByAppAndStream(app, streamId); gbStreamMapper.del(app, streamId); src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
@@ -2,7 +2,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; @@ -324,6 +323,8 @@ */ boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList); boolean updateChannels(String deviceId, List<DeviceChannel> deviceChannelList); /** * 获取目录信息 * @param platformId src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -194,6 +194,119 @@ } @Override public boolean updateChannels(String deviceId, List<DeviceChannel> deviceChannelList) { if (CollectionUtils.isEmpty(deviceChannelList)) { return false; } List<DeviceChannel> allChannels = deviceChannelMapper.queryAllChannels(deviceId); Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>(); if (allChannels.size() > 0) { for (DeviceChannel deviceChannel : allChannels) { allChannelMap.put(deviceChannel.getChannelId(), deviceChannel); } } List<DeviceChannel> addChannels = new ArrayList<>(); List<DeviceChannel> updateChannels = new ArrayList<>(); TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); // 数据去重 StringBuilder stringBuilder = new StringBuilder(); Map<String, Integer> subContMap = new HashMap<>(); if (deviceChannelList.size() > 0) { // 数据去重 Set<String> gbIdSet = new HashSet<>(); for (DeviceChannel deviceChannel : deviceChannelList) { if (!gbIdSet.contains(deviceChannel.getChannelId())) { gbIdSet.add(deviceChannel.getChannelId()); if (allChannelMap.containsKey(deviceChannel.getChannelId())) { deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId()); deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio()); updateChannels.add(deviceChannel); }else { addChannels.add(deviceChannel); } if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) { if (subContMap.get(deviceChannel.getParentId()) == null) { subContMap.put(deviceChannel.getParentId(), 1); }else { Integer count = subContMap.get(deviceChannel.getParentId()); subContMap.put(deviceChannel.getParentId(), count++); } } }else { stringBuilder.append(deviceChannel.getChannelId()).append(","); } } if (addChannels.size() > 0) { for (DeviceChannel channel : addChannels) { if (subContMap.get(channel.getChannelId()) != null){ channel.setSubCount(subContMap.get(channel.getChannelId())); } } } if (updateChannels.size() > 0) { for (DeviceChannel channel : updateChannels) { if (subContMap.get(channel.getChannelId()) != null){ channel.setSubCount(subContMap.get(channel.getChannelId())); } } } } if (stringBuilder.length() > 0) { logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder); } if(CollectionUtils.isEmpty(updateChannels) && CollectionUtils.isEmpty(addChannels) ){ logger.info("通道更新,数据为空={}" , deviceChannelList); return false; } try { int limitCount = 300; boolean result = false; if (addChannels.size() > 0) { if (addChannels.size() > limitCount) { for (int i = 0; i < addChannels.size(); i += limitCount) { int toIndex = i + limitCount; if (i + limitCount > addChannels.size()) { toIndex = addChannels.size(); } result = result || deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0; } }else { result = result || deviceChannelMapper.batchAdd(addChannels) < 0; } } if (updateChannels.size() > 0) { if (updateChannels.size() > limitCount) { for (int i = 0; i < updateChannels.size(); i += limitCount) { int toIndex = i + limitCount; if (i + limitCount > updateChannels.size()) { toIndex = updateChannels.size(); } result = result || deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0; } }else { result = result || deviceChannelMapper.batchUpdate(updateChannels) < 0; } } if (result) { //事务回滚 dataSourceTransactionManager.rollback(transactionStatus); }else { //手动提交 dataSourceTransactionManager.commit(transactionStatus); } return true; }catch (Exception e) { e.printStackTrace(); dataSourceTransactionManager.rollback(transactionStatus); return false; } } @Override public void deviceChannelOnline(String deviceId, String channelId) { deviceChannelMapper.online(deviceId, channelId); src/main/java/com/genersoft/iot/vmp/vmanager/user/UserController.java
@@ -11,17 +11,13 @@ import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.authentication.AuthenticationManager; import org.springframework.util.DigestUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import javax.security.sasl.AuthenticationException; @@ -90,7 +86,7 @@ @PostMapping("/add") @Operation(summary = "停止视频回放") @Operation(summary = "添加用户") @Parameter(name = "username", description = "用户名", required = true) @Parameter(name = "password", description = "密码(未md5加密的密码)", required = true) @Parameter(name = "roleId", description = "角色ID", required = true)