From fc77b3f819b3143387b90a4d631725e7c6513ecd Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期五, 01 十二月 2023 15:49:18 +0800
Subject: [PATCH] 支持重新接入zlm的时候检查拉流代理数据是否异常,异常数据自动移除
---
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 123 +++++++++++++++++-----------------------
1 files changed, 52 insertions(+), 71 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
old mode 100644
new mode 100755
index cf8bdd2..0d1bad6
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -27,11 +27,13 @@
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*;
+import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sip.message.SIPResponse;
@@ -107,6 +109,9 @@
@Autowired
private ZlmHttpHookSubscribe subscribe;
+ @Autowired
+ private CloudRecordServiceMapper cloudRecordServiceMapper;
+
@Override
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
@@ -158,10 +163,7 @@
}
}
}
- String streamId = null;
- if (mediaServerItem.isRtpEnable()) {
- streamId = String.format("%s_%s", device.getDeviceId(), channelId);
- }
+ String streamId = String.format("%s_%s", device.getDeviceId(), channelId);;
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
if (ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
@@ -457,16 +459,13 @@
logger.warn("[褰曞儚鍥炴斁] 鍗曠鍙f敹娴佹椂涓嶆敮鎸乀CP涓诲姩鏂瑰紡鏀舵祦 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "鍗曠鍙f敹娴佹椂涓嶆敮鎸乀CP涓诲姩鏂瑰紡鏀舵祦");
}
- String stream = null;
- if (newMediaServerItem.isRtpEnable()) {
- String startTimeStr = startTime.replace("-", "")
- .replace(":", "")
- .replace(" ", "");
- String endTimeTimeStr = endTime.replace("-", "")
- .replace(":", "")
- .replace(" ", "");
- stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
- }
+ String startTimeStr = startTime.replace("-", "")
+ .replace(":", "")
+ .replace(" ", "");
+ String endTimeTimeStr = endTime.replace("-", "")
+ .replace(":", "")
+ .replace(" ", "");
+ String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
}
@@ -628,44 +627,13 @@
if (ssrcInResponse != null) {
// 鍗曠鍙�
// 閲嶆柊璁㈤槄娴佷笂绾�
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp",
- ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
- subscribe.removeSubscribe(hookSubscribe);
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), null, inviteInfo.getStream());
streamSession.remove(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
-
- String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
- hookSubscribe.getContent().put("stream", stream);
-
- inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
+ inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
streamSession.put(device.getDeviceId(), channelId, ssrcTransaction.getCallId(),
- stream, ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
- subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
- logger.info("[Invite 200OK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam);
- dynamicTask.stop(timeOutTaskKey);
- subscribe.removeSubscribe(hookSubscribe);
- // hook鍝嶅簲
- StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId);
- if (streamInfo == null){
- callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
- InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
- inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null,
- InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
- InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
- return;
- }
- callback.run(InviteErrorCode.SUCCESS.getCode(),
- InviteErrorCode.SUCCESS.getMsg(), streamInfo);
- inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null,
- InviteErrorCode.SUCCESS.getCode(),
- InviteErrorCode.SUCCESS.getMsg(),
- streamInfo);
- if (inviteSessionType == InviteSessionType.PLAY) {
- snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream);
- }
- });
+ inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
}
}
}
@@ -786,31 +754,44 @@
logger.warn("鏌ヨ褰曞儚淇℃伅鏃跺彂鐜拌妭鐐瑰凡绂荤嚎");
return null;
}
- if (mediaServerItem.getRecordAssistPort() > 0) {
- JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
- if (jsonObject == null) {
- throw new ControllerException(ErrorCode.ERROR100.getCode(), "杩炴帴Assist鏈嶅姟澶辫触");
- }
- if (jsonObject.getInteger("code") == 0) {
- long duration = jsonObject.getLong("data");
-
- if (duration == 0) {
- inviteInfo.getStreamInfo().setProgress(0);
- } else {
- String startTime = inviteInfo.getStreamInfo().getStartTime();
- String endTime = inviteInfo.getStreamInfo().getEndTime();
- long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
- long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
-
- BigDecimal currentCount = new BigDecimal(duration / 1000);
- BigDecimal totalCount = new BigDecimal(end - start);
- BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
- double process = divide.doubleValue();
- inviteInfo.getStreamInfo().setProgress(process);
- }
- inviteStreamService.updateInviteInfo(inviteInfo);
- }
+ if (mediaServerItem.getRecordAssistPort() == 0) {
+ throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈厤缃瓵ssist鏈嶅姟锛屾棤娉曞畬鎴愬綍鍍忎笅杞�");
}
+ SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
+
+ if (ssrcTransaction == null) {
+ logger.warn("[鑾峰彇涓嬭浇杩涘害]锛屾湭鎵惧埌涓嬭浇浜嬪姟淇℃伅");
+ return null;
+ }
+
+ // 涓轰簡鏀寔澶氫釜鏁版嵁搴擄紝杩欓噷涓嶈兘浣跨敤姹傚拰鍑芥暟鏉ョ洿鎺ヨ幏鍙栨�绘暟浜�
+ List<CloudRecordItem> cloudRecordItemList = cloudRecordServiceMapper.getList(null, "rtp", inviteInfo.getStream(), null, null, ssrcTransaction.getCallId(), null);
+
+ if (cloudRecordItemList.isEmpty()) {
+ logger.warn("[鑾峰彇涓嬭浇杩涘害]锛屾湭鎵惧埌涓嬭浇瑙嗛淇℃伅");
+ return null;
+ }
+ long duration = 0;
+ for (CloudRecordItem cloudRecordItem : cloudRecordItemList) {
+ duration += cloudRecordItem.getTimeLen();
+ }
+ if (duration == 0) {
+ inviteInfo.getStreamInfo().setProgress(0);
+ } else {
+ String startTime = inviteInfo.getStreamInfo().getStartTime();
+ String endTime = inviteInfo.getStreamInfo().getEndTime();
+ // 姝ゆ椂start鍜宔nd鍗曚綅鏄
+ long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
+ long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
+
+ BigDecimal currentCount = new BigDecimal(duration);
+ BigDecimal totalCount = new BigDecimal((end - start) * 1000);
+ BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
+ double process = divide.doubleValue();
+ inviteInfo.getStreamInfo().setProgress(process);
+ }
+ inviteStreamService.updateInviteInfo(inviteInfo);
+
return inviteInfo.getStreamInfo();
}
return null;
--
Gitblit v1.8.0