src/main/java/com/genersoft/iot/vmp/media/bean/ResultForOnPublish.java
New file @@ -0,0 +1,59 @@ package com.genersoft.iot.vmp.media.bean; public class ResultForOnPublish { private boolean enable_audio; private boolean enable_mp4; private int mp4_max_second; private String mp4_save_path; private String stream_replace; private Integer modify_stamp; public boolean isEnable_audio() { return enable_audio; } public void setEnable_audio(boolean enable_audio) { this.enable_audio = enable_audio; } public boolean isEnable_mp4() { return enable_mp4; } public void setEnable_mp4(boolean enable_mp4) { this.enable_mp4 = enable_mp4; } public int getMp4_max_second() { return mp4_max_second; } public void setMp4_max_second(int mp4_max_second) { this.mp4_max_second = mp4_max_second; } public String getMp4_save_path() { return mp4_save_path; } public void setMp4_save_path(String mp4_save_path) { this.mp4_save_path = mp4_save_path; } public String getStream_replace() { return stream_replace; } public void setStream_replace(String stream_replace) { this.stream_replace = stream_replace; } public Integer getModify_stamp() { return modify_stamp; } public void setModify_stamp(Integer modify_stamp) { this.modify_stamp = modify_stamp; } } src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java
New file @@ -0,0 +1,75 @@ package com.genersoft.iot.vmp.media.event; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import org.springframework.context.ApplicationEvent; /** * 流到来事件 */ public class MediaArrivalEvent extends ApplicationEvent { public MediaArrivalEvent(Object source) { super(source); } public static MediaArrivalEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){ MediaArrivalEvent mediaArrivalEvent = new MediaArrivalEvent(source); mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam)); mediaArrivalEvent.setApp(hookParam.getApp()); mediaArrivalEvent.setStream(hookParam.getStream()); mediaArrivalEvent.setMediaServer(mediaServer); mediaArrivalEvent.setSchema(hookParam.getSchema()); return mediaArrivalEvent; } private MediaInfo mediaInfo; private String app; private String stream; private MediaServer mediaServer; private String schema; public MediaInfo getMediaInfo() { return mediaInfo; } public void setMediaInfo(MediaInfo mediaInfo) { this.mediaInfo = mediaInfo; } public String getApp() { return app; } public void setApp(String app) { this.app = app; } public String getStream() { return stream; } public void setStream(String stream) { this.stream = stream; } public MediaServer getMediaServer() { return mediaServer; } public void setMediaServer(MediaServer mediaServer) { this.mediaServer = mediaServer; } public String getSchema() { return schema; } public void setSchema(String schema) { this.schema = schema; } } src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java
New file @@ -0,0 +1,65 @@ package com.genersoft.iot.vmp.media.event; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookListener; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import org.springframework.context.ApplicationEvent; /** * 流离开事件 */ public class MediaDepartureEvent extends ApplicationEvent { public MediaDepartureEvent(Object source) { super(source); } private String app; private String stream; private MediaServer mediaServer; private String schema; public static MediaDepartureEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){ MediaDepartureEvent mediaDepartureEven = new MediaDepartureEvent(source); mediaDepartureEven.setApp(hookParam.getApp()); mediaDepartureEven.setStream(hookParam.getStream()); mediaDepartureEven.setSchema(hookParam.getSchema()); mediaDepartureEven.setMediaServer(mediaServer); return mediaDepartureEven; } public String getApp() { return app; } public void setApp(String app) { this.app = app; } public String getStream() { return stream; } public void setStream(String stream) { this.stream = stream; } public MediaServer getMediaServer() { return mediaServer; } public void setMediaServer(MediaServer mediaServer) { this.mediaServer = mediaServer; } public String getSchema() { return schema; } public void setSchema(String schema) { this.schema = schema; } } src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -8,11 +8,15 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -30,7 +34,9 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; @@ -72,6 +78,31 @@ /** * 流到来的处理 */ @Async("taskExecutor") @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema())) { logger.info("流变化:注册 app->{}, stream->{}", event.getApp(), event.getStream()); addCount(event.getSeverId()); } } /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { logger.info("流变化:注销, app->{}, stream->{}", event.getApp(), event.getStream()); removeCount(event.getSeverId()); } } /** * 初始化 */ @Override src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -19,6 +19,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; @@ -190,50 +193,7 @@ if (mediaServer == null) { return new HookResultForOnPublish(200, "success"); } // 推流鉴权的处理 if (!"rtp".equals(param.getApp())) { StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (stream != null) { HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); result.setEnable_audio(stream.isEnableAudio()); result.setEnable_mp4(stream.isEnableMp4()); return result; } if (userSetting.getPushAuthority()) { // 对于推流进行鉴权 Map<String, String> paramMap = urlParamToMap(param.getParams()); // 推流鉴权 if (param.getParams() == null) { logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)"); return new HookResultForOnPublish(401, "Unauthorized"); } String sign = paramMap.get("sign"); if (sign == null) { logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)"); return new HookResultForOnPublish(401, "Unauthorized"); } // 推流自定义播放鉴权码 String callId = paramMap.get("callId"); // 鉴权配置 boolean hasAuthority = userService.checkPushAuthority(callId, sign); if (!hasAuthority) { logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign); return new HookResultForOnPublish(401, "Unauthorized"); } StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); streamAuthorityInfo.setCallId(callId); streamAuthorityInfo.setSign(sign); // 鉴权通过 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); } } else { zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId()); } HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); result.setEnable_audio(true); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { @@ -241,81 +201,16 @@ } }); // 是否录像 if ("rtp".equals(param.getApp())) { result.setEnable_mp4(userSetting.getRecordSip()); } else { result.setEnable_mp4(userSetting.isRecordPushLive()); ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams()); if (resultForOnPublish != null) { HookResultForOnPublish successResult = HookResultForOnPublish.getInstance(resultForOnPublish); logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, successResult); return successResult; }else { HookResultForOnPublish fail = HookResultForOnPublish.Fail(); logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, fail); return fail; } // 国标流 if ("rtp".equals(param.getApp())) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); // 单端口模式下修改流 ID if (!mediaServer.isRtpEnable() && inviteInfo == null) { String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16)); inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc); if (inviteInfo != null) { result.setStream_replace(inviteInfo.getStream()); logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", param.getStream(), inviteInfo.getStream()); } } // 设置音频信息及录制信息 List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream()); if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { // 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用 StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); streamAuthorityInfo.setApp(param.getApp()); streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream()); streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId()); redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo); String deviceId = ssrcTransactionForAll.get(0).getDeviceId(); String channelId = ssrcTransactionForAll.get(0).getChannelId(); DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { result.setEnable_audio(deviceChannel.isHasAudio()); } // 如果是录像下载就设置视频间隔十秒 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) { // 获取录像的总时长,然后设置为这个视频的时长 InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, param.getStream()); if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) { String startTime = inviteInfoForDownload.getStreamInfo().getStartTime(); String endTime = inviteInfoForDownload.getStreamInfo().getEndTime(); long difference = DateUtil.getDifference(startTime, endTime) / 1000; result.setMp4_max_second((int) difference); result.setEnable_mp4(true); // 设置为2保证得到的mp4的时长是正常的 result.setModify_stamp(2); } } // 如果是talk对讲,则默认获取声音 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) { result.setEnable_audio(true); } } } else if (param.getApp().equals("broadcast")) { result.setEnable_audio(true); } else if (param.getApp().equals("talk")) { result.setEnable_audio(true); } if (param.getApp().equalsIgnoreCase("rtp")) { String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream(); OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey); String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream(); OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS); if (otherRtpSendInfo != null || otherPsSendInfo != null) { result.setEnable_mp4(true); } } logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, result); return result; } @@ -326,11 +221,20 @@ @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8") public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) { MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId()); if (param.isRegist()) { logger.info("[ZLM HOOK] 流注册, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer); applicationEventPublisher.publishEvent(mediaArrivalEvent); } else { logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); MediaDepartureEvent mediaArrivalEvent = MediaDepartureEvent.getInstance(this, param, mediaServer); applicationEventPublisher.publishEvent(mediaArrivalEvent); } return HookResult.SUCCESS(); JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java
@@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.media.zlm.dto; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnPublishHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -97,21 +98,23 @@ this.sign = sign; } public static StreamAuthorityInfo getInstanceByHook(OnPublishHookParam hookParam) { public static StreamAuthorityInfo getInstanceByHook(String app, String stream, String id) { StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo(); streamAuthorityInfo.setApp(hookParam.getApp()); streamAuthorityInfo.setStream(hookParam.getStream()); streamAuthorityInfo.setId(hookParam.getId()); streamAuthorityInfo.setApp(app); streamAuthorityInfo.setStream(stream); streamAuthorityInfo.setId(id); return streamAuthorityInfo; } public static StreamAuthorityInfo getInstanceByHook(OnStreamChangedHookParam onStreamChangedHookParam) { public static StreamAuthorityInfo getInstanceByHook(MediaArrivalEvent event) { StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo(); streamAuthorityInfo.setApp(onStreamChangedHookParam.getApp()); streamAuthorityInfo.setStream(onStreamChangedHookParam.getStream()); streamAuthorityInfo.setId(onStreamChangedHookParam.getMediaServerId()); streamAuthorityInfo.setOriginType(onStreamChangedHookParam.getOriginType()); streamAuthorityInfo.setOriginTypeStr(onStreamChangedHookParam.getOriginTypeStr()); streamAuthorityInfo.setApp(event.getApp()); streamAuthorityInfo.setStream(event.getStream()); streamAuthorityInfo.setId(event.getSeverId()); if (event.getMediaInfo() != null) { streamAuthorityInfo.setOriginType(event.getMediaInfo().getOriginType()); } return streamAuthorityInfo; } } src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java
@@ -18,8 +18,8 @@ return new HookResult(0, "success"); } public static HookResult Fail(){ return new HookResult(-1, "fail"); public static HookResultForOnPublish Fail(){ return new HookResultForOnPublish(-1, "fail"); } public int getCode() { src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java
@@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.media.zlm.dto.hook; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; public class HookResultForOnPublish extends HookResult{ private boolean enable_audio; @@ -16,6 +18,17 @@ return new HookResultForOnPublish(0, "success"); } public static HookResultForOnPublish getInstance(ResultForOnPublish resultForOnPublish){ HookResultForOnPublish successResult = new HookResultForOnPublish(0, "success"); successResult.setEnable_audio(resultForOnPublish.isEnable_audio()); successResult.setEnable_mp4(resultForOnPublish.isEnable_mp4()); successResult.setModify_stamp(resultForOnPublish.getModify_stamp()); successResult.setStream_replace(resultForOnPublish.getStream_replace()); successResult.setMp4_max_second(resultForOnPublish.getMp4_max_second()); successResult.setMp4_save_path(resultForOnPublish.getMp4_save_path()); return successResult; } public HookResultForOnPublish(int code, String msg) { setCode(code); setMsg(msg); src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
@@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; /** @@ -47,5 +48,5 @@ */ boolean authenticatePlay(String app, String stream, String callId); boolean authenticatePublish(String app, String stream, String callId, String sign); ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params); } src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
@@ -6,13 +6,18 @@ import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.List; @@ -31,6 +36,35 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; @Autowired private IVideoManagerStorage storage; /** * 流到来的处理 */ @Async("taskExecutor") @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { } } /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream()); if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { removeInviteInfo(inviteInfo); stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); } } } @Override public void updateInviteInfo(InviteInfo inviteInfo) { if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) { src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -1,22 +1,43 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookListener; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResultForOnPublish; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; import java.util.HashMap; import java.util.List; import java.util.Map; @Service public class MediaServiceImpl implements IMediaService { @@ -31,6 +52,31 @@ @Autowired private MediaConfig mediaConfig; @Autowired private IStreamProxyService streamProxyService; @Autowired private UserSetting userSetting; @Autowired private RedisTemplate<Object, Object> redisTemplate; @Autowired private IUserService userService; @Autowired private IInviteStreamService inviteStreamService; @Autowired private VideoStreamSessionManager sessionManager; @Autowired private IVideoManagerStorage storager; @Autowired private ZLMMediaListManager zlmMediaListManager; @Override @@ -103,4 +149,146 @@ StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); return (streamAuthorityInfo != null && streamAuthorityInfo.getCallId() != null && !streamAuthorityInfo.getCallId().equals(callId)); } @Override public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) { // 推流鉴权的处理 if (!"rtp".equals(app)) { StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream); if (streamProxyItem != null) { ResultForOnPublish result = new ResultForOnPublish(); result.setEnable_audio(streamProxyItem.isEnableAudio()); result.setEnable_mp4(streamProxyItem.isEnableMp4()); return result; } if (userSetting.getPushAuthority()) { // 对于推流进行鉴权 Map<String, String> paramMap = urlParamToMap(params); // 推流鉴权 if (params == null) { logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)"); throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized"); } String sign = paramMap.get("sign"); if (sign == null) { logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)"); throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized"); } // 推流自定义播放鉴权码 String callId = paramMap.get("callId"); // 鉴权配置 boolean hasAuthority = userService.checkPushAuthority(callId, sign); if (!hasAuthority) { logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign); throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized"); } StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId()); streamAuthorityInfo.setCallId(callId); streamAuthorityInfo.setSign(sign); // 鉴权通过 redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo); } } else { zlmMediaListManager.sendStreamEvent(app, stream, mediaServer.getId()); } ResultForOnPublish result = new ResultForOnPublish(); result.setEnable_audio(true); // 是否录像 if ("rtp".equals(app)) { result.setEnable_mp4(userSetting.getRecordSip()); } else { result.setEnable_mp4(userSetting.isRecordPushLive()); } // 国标流 if ("rtp".equals(app)) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream); // 单端口模式下修改流 ID if (!mediaServer.isRtpEnable() && inviteInfo == null) { String ssrc = String.format("%010d", Long.parseLong(stream, 16)); inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc); if (inviteInfo != null) { result.setStream_replace(inviteInfo.getStream()); logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", stream, inviteInfo.getStream()); } } // 设置音频信息及录制信息 List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream); if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { // 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用 StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId()); streamAuthorityInfo.setApp(app); streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream()); streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId()); redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo); String deviceId = ssrcTransactionForAll.get(0).getDeviceId(); String channelId = ssrcTransactionForAll.get(0).getChannelId(); DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { result.setEnable_audio(deviceChannel.isHasAudio()); } // 如果是录像下载就设置视频间隔十秒 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) { // 获取录像的总时长,然后设置为这个视频的时长 InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream); if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) { String startTime = inviteInfoForDownload.getStreamInfo().getStartTime(); String endTime = inviteInfoForDownload.getStreamInfo().getEndTime(); long difference = DateUtil.getDifference(startTime, endTime) / 1000; result.setMp4_max_second((int) difference); result.setEnable_mp4(true); // 设置为2保证得到的mp4的时长是正常的 result.setModify_stamp(2); } } // 如果是talk对讲,则默认获取声音 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) { result.setEnable_audio(true); } } } else if (app.equals("broadcast")) { result.setEnable_audio(true); } else if (app.equals("talk")) { result.setEnable_audio(true); } if (app.equalsIgnoreCase("rtp")) { String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream; OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey); String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + stream; OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS); if (otherRtpSendInfo != null || otherPsSendInfo != null) { result.setEnable_mp4(true); } } return result; } private Map<String, String> urlParamToMap(String params) { HashMap<String, String> map = new HashMap<>(); if (ObjectUtils.isEmpty(params)) { return map; } String[] paramsArray = params.split("&"); if (paramsArray.length == 0) { return map; } for (String param : paramsArray) { String[] paramArray = param.split("="); if (paramArray.length == 2) { map.put(paramArray[0], paramArray[1]); } } return map; } } src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -17,17 +17,17 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; @@ -43,6 +43,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; @@ -121,6 +123,77 @@ @Autowired private SSRCFactory ssrcFactory; /** * 流到来的处理 */ @Async("taskExecutor") @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("broadcast".equals(event.getApp())) { if (event.getStream().indexOf("_") > 0) { String[] streamArray = event.getStream().split("_"); if (streamArray.length == 2) { String deviceId = streamArray[0]; String channelId = streamArray[1]; Device device = deviceService.getDevice(deviceId); if (device == null) { logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId); return; } if ("broadcast".equals(event.getApp())) { if (audioBroadcastManager.exit(deviceId, channelId)) { stopAudioBroadcast(deviceId, channelId); } // 开启语音对讲通道 try { audioBroadcastCmd(device, channelId, event.getMediaServer(), event.getApp(), event.getStream(), 60, false, (msg) -> { logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); }); } catch (InvalidArgumentException | ParseException | SipException e) { logger.error("[命令发送失败] 语音对讲: {}", e.getMessage()); } }else if ("talk".equals(event.getApp())) { // 开启语音对讲通道 talkCmd(device, channelId, event.getMediaServer(), event.getStream(), (msg) -> { logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); }); } } } } } /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) { if (event.getStream().indexOf("_") > 0) { String[] streamArray = event.getStream().split("_"); if (streamArray.length == 2) { String deviceId = streamArray[0]; String channelId = streamArray[1]; Device device = deviceService.getDevice(deviceId); if (device == null) { logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId); return; } if ("broadcast".equals(event.getApp())) { stopAudioBroadcast(deviceId, channelId); }else if ("talk".equals(event.getApp())) { stopTalk(device, channelId, false); } } } } } @Override public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) { src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -9,6 +9,8 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; @@ -33,7 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; @@ -102,6 +106,28 @@ @Autowired TransactionDefinition transactionDefinition; /** * 流到来的处理 */ @Async("taskExecutor") @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema())) { updateStatus(true, event.getApp(), event.getStream()); } } /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { updateStatus(true, event.getApp(), event.getStream()); } } @Override public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) { src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -10,6 +10,10 @@ import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; @@ -28,7 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; @@ -85,6 +91,42 @@ @Autowired private MediaConfig mediaConfig; /** * 流到来的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaArrivalEvent event) { MediaInfo mediaInfo = event.getMediaInfo(); if (mediaInfo == null) { return; } if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal() && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal() && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) { return; } StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream()); if (streamAuthorityInfo == null) { streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event); } else { streamAuthorityInfo.setOriginType(mediaInfo.getOriginType()); } redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo); } /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { } private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) { if (streamInfoList == null || streamInfoList.isEmpty()) {