pom.xml
@@ -11,7 +11,7 @@ <groupId>com.genersoft</groupId> <artifactId>wvp-pro</artifactId> <version>2.3.2</version> <version>2.6.6</version> <name>web video platform</name> <description>国标28181视频平台</description> src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -1,10 +1,7 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; @@ -101,12 +98,14 @@ } } public void stop(String key) { if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(false); public boolean stop(String key) { boolean result = false; if (futureMap.get(key) != null && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) { result = futureMap.get(key).cancel(false); futureMap.remove(key); runnableMap.remove(key); } return result; } public boolean contains(String key) { src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java
@@ -2,13 +2,11 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.gb28181.device.DeviceQuery; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import java.net.InetAddress; import java.net.UnknownHostException; @@ -74,10 +72,6 @@ @Value("${media.rtp.port-range}") private String rtpPortRange; @Value("${media.rtp.send-port-range}") private String sendRtpPortRange; @Value("${media.record-assist-port:0}") private Integer recordAssistPort = 0; @@ -191,10 +185,6 @@ return sipDomain; } public String getSendRtpPortRange() { return sendRtpPortRange; } public MediaServerItem getMediaSerItem(){ MediaServerItem mediaServerItem = new MediaServerItem(); mediaServerItem.setId(id); @@ -214,9 +204,8 @@ mediaServerItem.setSecret(secret); mediaServerItem.setRtpEnable(rtpEnable); mediaServerItem.setRtpPortRange(rtpPortRange); mediaServerItem.setSendRtpPortRange(sendRtpPortRange); mediaServerItem.setRecordAssistPort(recordAssistPort); mediaServerItem.setHookAliveInterval(120); mediaServerItem.setHookAliveInterval(30.00f); mediaServerItem.setCreateTime(DateUtil.getNow()); mediaServerItem.setUpdateTime(DateUtil.getNow()); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -8,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; @@ -39,9 +38,10 @@ import org.springframework.stereotype.Component; import javax.sdp.*; import javax.sip.*; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; @@ -656,6 +656,7 @@ if (!platform.isStartOfflinePush()) { // 平台设置中关闭了拉起离线的推流则直接回复 try { logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
@@ -85,7 +85,7 @@ String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword(); AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); if (authHead == null && !ObjectUtils.isEmpty(password)) { logger.info("[注册请求] 未携带授权头 回复401: {}", requestAddress); logger.info("[注册请求] 回复401: {}", requestAddress); response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain()); sipSender.transmitRequest(response); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -1,19 +1,19 @@ package com.genersoft.iot.vmp.media.zlm; import java.text.ParseException; import java.util.HashMap; import java.util.List; import java.util.Map; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -24,18 +24,15 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @description:针对 ZLMediaServer的hook事件监听 @@ -571,6 +568,8 @@ public JSONObject onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){ jsonObject.put("ip", request.getRemoteAddr()); System.out.println(jsonObject.toJSONString() ); ZLMServerConfig zlmServerConfig = JSON.to(ZLMServerConfig.class, jsonObject); zlmServerConfig.setIp(request.getRemoteAddr()); logger.info("[ZLM HOOK] zlm 启动 " + zlmServerConfig.getGeneralMediaServerId()); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -10,7 +10,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import java.util.*; @@ -294,7 +293,8 @@ */ public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId); return (mediaInfo.getInteger("code") == 0 return mediaInfo != null && (mediaInfo.getInteger("code") == 0 && mediaInfo.getJSONArray("data") != null && mediaInfo.getJSONArray("data").size() > 0); } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java
@@ -66,7 +66,7 @@ private String hookAdminParams; @JSONField(name = "hook.alive_interval") private int hookAliveInterval; private Float hookAliveInterval; @JSONField(name = "hook.enable") private String hookEnable; @@ -798,11 +798,11 @@ this.shellPhell = shellPhell; } public int getHookAliveInterval() { public Float getHookAliveInterval() { return hookAliveInterval; } public void setHookAliveInterval(int hookAliveInterval) { public void setHookAliveInterval(Float hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java
@@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import io.swagger.v3.oas.annotations.media.Schema; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import java.util.HashMap; @@ -55,7 +54,7 @@ private String secret; @Schema(description = "keepalive hook触发间隔,单位秒") private int hookAliveInterval; private Float hookAliveInterval; @Schema(description = "是否使用多端口模式") private boolean rtpEnable; @@ -332,11 +331,11 @@ this.sendRtpPortRange = sendRtpPortRange; } public int getHookAliveInterval() { public Float getHookAliveInterval() { return hookAliveInterval; } public void setHookAliveInterval(int hookAliveInterval) { public void setHookAliveInterval(Float hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } } src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -11,6 +11,9 @@ import java.util.List; import java.util.Map; /** * @author lin */ public interface IStreamPushService { List<StreamPushItem> handleJSON(String json, MediaServerItem mediaServerItem); src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -1,19 +1,32 @@ package com.genersoft.iot.vmp.service.impl; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -24,28 +37,8 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.util.ObjectUtils; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import java.time.LocalDateTime; import java.util.*; /** * 媒体服务器节点管理 @@ -129,6 +122,7 @@ @Override public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port) { if (mediaServerItem == null || mediaServerItem.getId() == null) { logger.info("[openRTPServer] 失败, mediaServerItem == null || mediaServerItem.getId() == null"); return null; } // 获取mediaServer可用的ssrc @@ -306,7 +300,7 @@ public void add(MediaServerItem mediaServerItem) { mediaServerItem.setCreateTime(DateUtil.getNow()); mediaServerItem.setUpdateTime(DateUtil.getNow()); mediaServerItem.setHookAliveInterval(120); mediaServerItem.setHookAliveInterval(30f); JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); if (responseJSON != null) { JSONArray data = responseJSON.getJSONArray("data"); @@ -413,7 +407,7 @@ } final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId(); dynamicTask.stop(zlmKeepaliveKey); dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval() + 5) * 1000); dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (Math.getExponent(serverItem.getHookAliveInterval()) + 5) * 1000); publisher.zlmOnlineEventPublish(serverItem.getId()); logger.info("[ZLM] 连接成功 {} - {}:{} ", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); @@ -666,7 +660,7 @@ } final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerItem.getId(); dynamicTask.stop(zlmKeepaliveKey); dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval() + 5) * 1000); dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval().intValue() + 5) * 1000); } private MediaServerItem getOneFromDatabase(String mediaServerId) { src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,49 +1,28 @@ package com.genersoft.iot.vmp.service.impl; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; import java.util.*; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; import org.springframework.web.context.request.async.DeferredResult; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; @@ -53,8 +32,29 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; import java.util.List; import java.util.Objects; import java.util.UUID; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -212,6 +212,15 @@ } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); if (ssrcInfo == null) { WVPResult wvpResult = new WVPResult(); wvpResult.setCode(ErrorCode.ERROR100.getCode()); wvpResult.setMsg("开启收流失败"); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); return playResult; } play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> { if (hookEvent != null) { hookEvent.response(mediaServerItem, response); @@ -249,45 +258,33 @@ ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback, String uuid) { String streamId = null; if (mediaServerItem.isRtpEnable()) { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } if (ssrcInfo == null) { ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); } logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); SSRCInfo finalSsrcInfo = ssrcInfo; dynamicTask.startDelay(timeOutTaskKey, () -> { logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc()); timeoutCallback.run(1, "收流超时"); // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 try { cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException e) { logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); } catch (SsrcTransactionNotFoundException e) { timeoutCallback.run(0, "点播超时"); mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 if (redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId) == null) { logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 try { cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); } finally { timeoutCallback.run(1, "收流超时"); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); } } }, userSetting.getPlayTimeout()); final String ssrc = ssrcInfo.getSsrc(); final String stream = ssrcInfo.getStream(); //端口获取失败的ssrcInfo 没有必要发送点播指令 if (ssrcInfo.getPort() <= 0) { logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); return; } try { cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); System.out.println("停止超时任务: " + timeOutTaskKey); dynamicTask.stop(timeOutTaskKey); // hook响应 onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid); hookEvent.response(mediaServerItemInuse, response); @@ -303,18 +300,18 @@ //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 if (ssrc.equals(ssrcInResponse)) { if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { return; } logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse); logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { // ssrc 不可用 // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); event.msg = "下级自定义了ssrc,但是此ssrc不可用"; event.statusCode = 400; errorEvent.response(event); @@ -324,7 +321,7 @@ // 单端口模式streamId也有变化,需要重新设置监听 if (!mediaServerItem.isRtpEnable()) { // 添加订阅 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { @@ -336,30 +333,30 @@ }); } // 关闭rtp server mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 重新开启ssrc server mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort()); mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort()); } } }, (event) -> { dynamicTask.stop(timeOutTaskKey); mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); errorEvent.response(event); }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[命令发送失败] 点播消息: {}", e.getMessage()); dynamicTask.stop(timeOutTaskKey); mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); eventResult.msg = "命令发送失败"; errorEvent.response(eventResult); src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java
@@ -23,10 +23,10 @@ @Insert("<script> " + "INSERT into platform_gb_stream " + "(gbStreamId, platformId, catalogId,status) " + "(gbStreamId, platformId, catalogId) " + "values " + "<foreach collection='streamPushItems' index='index' item='item' separator=','> " + "(${item.gbStreamId}, '${item.platformId}', '${item.catalogId}'), '${item.status}')" + "(${item.gbStreamId}, '${item.platformId}', '${item.catalogId}')" + "</foreach> " + "</script>") int batchAdd(List<StreamPushItem> streamPushItems);