From fb4336c5e01e3cff19064945fdb47a0b4e1f9bab Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 01 二月 2023 10:57:02 +0800
Subject: [PATCH] Merge pull request #276 from brokge/fix-269
---
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 135 +++++++++++++++++++-------------------------
1 files changed, 59 insertions(+), 76 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index a56f2d0..b8e7c8e 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -39,11 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
-import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
@@ -103,10 +100,6 @@
@Autowired
private ZlmHttpHookSubscribe subscribe;
-
- @Qualifier("taskExecutor")
- @Autowired
- private ThreadPoolTaskExecutor taskExecutor;
@Override
public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
@@ -381,14 +374,10 @@
}
}
- private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
- RequestMessage msg = new RequestMessage();
- msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
- if (!ObjectUtils.isEmpty(uuid)) {
- msg.setId(uuid);
- }
- StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
+ private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, PlayBackCallback playBackCallback) {
+ StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
+ PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>();
if (streamInfo != null) {
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
@@ -397,17 +386,16 @@
}
redisCatchStorage.startPlay(streamInfo);
- WVPResult wvpResult = new WVPResult();
- wvpResult.setCode(ErrorCode.SUCCESS.getCode());
- wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
- wvpResult.setData(streamInfo);
- msg.setData(wvpResult);
- resultHolder.invokeAllResult(msg);
+ playBackResult.setCode(ErrorCode.SUCCESS.getCode());
+ playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
+ playBackResult.setData(streamInfo);
+ playBackCallback.call(playBackResult);
} else {
logger.warn("褰曞儚鍥炴斁璋冪敤澶辫触锛�");
- msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "褰曞儚鍥炴斁璋冪敤澶辫触锛�"));
- resultHolder.invokeAllResult(msg);
+ playBackResult.setCode(ErrorCode.ERROR100.getCode());
+ playBackResult.setMsg("褰曞儚鍥炴斁璋冪敤澶辫触锛�");
+ playBackCallback.call(playBackResult);
}
}
@@ -418,7 +406,7 @@
}
MediaServerItem mediaServerItem;
if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
- mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
+ mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
} else {
mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
}
@@ -429,45 +417,56 @@
}
@Override
- public DeferredResult<WVPResult<StreamInfo>> playBack(String deviceId, String channelId, String startTime,
+ public MediaServerItem getNewMediaServerItemHasAssist(Device device) {
+ if (device == null) {
+ return null;
+ }
+ MediaServerItem mediaServerItem;
+ if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
+ mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);
+ } else {
+ mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
+ }
+ if (mediaServerItem == null) {
+ logger.warn("[鑾峰彇鍙敤鐨刏LM鑺傜偣]鏈壘鍒板彲浣跨敤鐨刏LM...");
+ }
+ return mediaServerItem;
+ }
+
+ @Override
+ public void playBack(String deviceId, String channelId, String startTime,
String endTime, InviteStreamCallback inviteStreamCallback,
PlayBackCallback callback) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
- return null;
+ return;
}
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, device.isSsrcCheck(), true);
- return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
+ playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
}
@Override
- public DeferredResult<WVPResult<StreamInfo>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
+ public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime,
String endTime, InviteStreamCallback infoCallBack,
PlayBackCallback playBackCallback) {
if (mediaServerItem == null || ssrcInfo == null) {
- return null;
+ return;
}
- String uuid = UUID.randomUUID().toString();
- String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
+
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "璁惧锛� " + deviceId + "涓嶅瓨鍦�");
}
- DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L);
- resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
- RequestMessage requestMessage = new RequestMessage();
- requestMessage.setId(uuid);
- requestMessage.setKey(key);
- PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
+
+ PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>();
String playBackTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
logger.warn(String.format("璁惧鍥炴斁瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId));
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg("鍥炴斁瓒呮椂");
- playBackResult.setData(requestMessage);
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
@@ -479,19 +478,14 @@
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
-
// 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰
playBackCallback.call(playBackResult);
- result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "鍥炴斁瓒呮椂"));
- resultHolder.exist(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid);
}, userSetting.getPlayTimeout());
SipSubscribe.Event errorEvent = event -> {
dynamicTask.stop(playBackTimeOutTaskKey);
- requestMessage.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("鍥炴斁澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)));
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg(String.format("鍥炴斁澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg));
- playBackResult.setData(requestMessage);
playBackResult.setEvent(event);
playBackCallback.call(playBackResult);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
@@ -509,11 +503,9 @@
return;
}
redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
- WVPResult<StreamInfo> success = WVPResult.success(streamInfo);
- requestMessage.setData(success);
playBackResult.setCode(ErrorCode.SUCCESS.getCode());
playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
- playBackResult.setData(requestMessage);
+ playBackResult.setData(streamInfo);
playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
playBackResult.setResponse(inviteStreamInfo.getResponse());
playBackCallback.call(playBackResult);
@@ -560,7 +552,7 @@
logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
dynamicTask.stop(playBackTimeOutTaskKey);
// hook鍝嶅簲
- onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
+ onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, playBackCallback);
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
});
}
@@ -580,50 +572,45 @@
eventResult.msg = "鍛戒护鍙戦�佸け璐�";
errorEvent.response(eventResult);
}
- return result;
}
@Override
- public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
+ public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
- return null;
+ return;
}
- MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
+ MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
+ if (newMediaServerItem == null) {
+ PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>();
+ downloadResult.setCode(ErrorCode.ERROR100.getCode());
+ downloadResult.setMsg("鏈壘鍒癮ssist鏈嶅姟");
+ playBackCallback.call(downloadResult);
+ return;
+ }
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, device.isSsrcCheck(), true);
- return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, hookCallBack);
+ download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, playBackCallback);
}
+
@Override
- public DeferredResult<WVPResult<StreamInfo>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
+ public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
if (mediaServerItem == null || ssrcInfo == null) {
- return null;
+ return;
}
- String uuid = UUID.randomUUID().toString();
- String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
- DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L);
+
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "璁惧锛�" + deviceId + "涓嶅瓨鍦�");
}
-
- resultHolder.put(key, uuid, result);
- RequestMessage requestMessage = new RequestMessage();
- requestMessage.setId(uuid);
- requestMessage.setKey(key);
- WVPResult<StreamInfo> wvpResult = new WVPResult<>();
- requestMessage.setData(wvpResult);
- PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
- downloadResult.setData(requestMessage);
+ PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>();
String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
logger.warn(String.format("褰曞儚涓嬭浇璇锋眰瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId));
- wvpResult.setCode(ErrorCode.ERROR100.getCode());
- wvpResult.setMsg("褰曞儚涓嬭浇璇锋眰瓒呮椂");
downloadResult.setCode(ErrorCode.ERROR100.getCode());
downloadResult.setMsg("褰曞儚涓嬭浇璇锋眰瓒呮椂");
hookCallBack.call(downloadResult);
@@ -638,16 +625,12 @@
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
- // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰
- hookCallBack.call(downloadResult);
}, userSetting.getPlayTimeout());
SipSubscribe.Event errorEvent = event -> {
dynamicTask.stop(downLoadTimeOutTaskKey);
downloadResult.setCode(ErrorCode.ERROR100.getCode());
downloadResult.setMsg(String.format("褰曞儚涓嬭浇澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg));
- wvpResult.setCode(ErrorCode.ERROR100.getCode());
- wvpResult.setMsg(String.format("褰曞儚涓嬭浇澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg));
downloadResult.setEvent(event);
hookCallBack.call(downloadResult);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
@@ -662,11 +645,9 @@
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
- wvpResult.setCode(ErrorCode.SUCCESS.getCode());
- wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
- wvpResult.setData(streamInfo);
downloadResult.setCode(ErrorCode.SUCCESS.getCode());
downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
+ downloadResult.setData(streamInfo);
downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
downloadResult.setResponse(inviteStreamInfo.getResponse());
hookCallBack.call(downloadResult);
@@ -678,7 +659,6 @@
eventResult.msg = "鍛戒护鍙戦�佸け璐�";
errorEvent.response(eventResult);
}
- return result;
}
@Override
@@ -698,7 +678,10 @@
}
if (mediaServerItem.getRecordAssistPort() > 0) {
JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
- if (jsonObject != null && jsonObject.getInteger("code") == 0) {
+ if (jsonObject == null) {
+ throw new ControllerException(ErrorCode.ERROR100.getCode(), "杩炴帴Assist鏈嶅姟澶辫触");
+ }
+ if (jsonObject.getInteger("code") == 0) {
long duration = jsonObject.getLong("data");
if (duration == 0) {
--
Gitblit v1.8.0