From 8870f5f5a182f4af527dc2b89ad75063019df14f Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 10 十一月 2022 09:40:01 +0800
Subject: [PATCH] 优化使用来源ip作为流ip

---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java |  166 ++++++++++++++++++++++++++++++-------------------------
 1 files changed, 90 insertions(+), 76 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index f899db3..3bbe88b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,49 +1,28 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.text.ParseException;
-import java.util.*;
-
-import javax.sip.InvalidArgumentException;
-import javax.sip.ResponseEvent;
-import javax.sip.SipException;
-
-import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.conf.exception.ServiceException;
-import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.*;
-import com.genersoft.iot.vmp.service.IDeviceService;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Service;
-import org.springframework.util.ObjectUtils;
-import org.springframework.web.context.request.async.DeferredResult;
-
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.conf.exception.ServiceException;
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IMediaService;
 import com.genersoft.iot.vmp.service.IPlayService;
@@ -53,8 +32,27 @@
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.ParseException;
+import java.util.List;
+import java.util.UUID;
 
 @SuppressWarnings(value = {"rawtypes", "unchecked"})
 @Service
@@ -111,46 +109,19 @@
     private ThreadPoolTaskExecutor taskExecutor;
 
     @Override
-    public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
-                           ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
-                           Runnable timeoutCallback) {
+    public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
+                                 ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
+                                 Runnable timeoutCallback) {
         if (mediaServerItem == null) {
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彲鐢ㄧ殑zlm");
         }
-        PlayResult playResult = new PlayResult();
+
         RequestMessage msg = new RequestMessage();
         String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
         msg.setKey(key);
-        String uuid = UUID.randomUUID().toString();
-        msg.setId(uuid);
-        playResult.setUuid(uuid);
-        DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
-        playResult.setResult(result);
-        // 褰曞儚鏌ヨ浠hannelId浣滀负deviceId鏌ヨ
-        resultHolder.put(key, uuid, result);
 
         Device device = redisCatchStorage.getDevice(deviceId);
         StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
-        playResult.setDevice(device);
-
-        result.onCompletion(() -> {
-            // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
-            taskExecutor.execute(() -> {
-                // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
-                String path = "snap";
-                String fileName = deviceId + "_" + channelId + ".jpg";
-                WVPResult wvpResult = (WVPResult) result.getResult();
-                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
-                    StreamInfo streamInfoForSuccess = (StreamInfo) wvpResult.getData();
-                    MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
-                    String streamUrl = streamInfoForSuccess.getFmp4().getUrl();
-
-                    // 璇锋眰鎴浘
-                    logger.info("[璇锋眰鎴浘]: " + fileName);
-                    zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
-                }
-            });
-        });
 
         if (streamInfo != null) {
             String streamId = streamInfo.getStream();
@@ -160,7 +131,7 @@
                 wvpResult.setMsg("鐐规挱澶辫触锛� redis缂撳瓨streamId绛変簬null");
                 msg.setData(wvpResult);
                 resultHolder.invokeAllResult(msg);
-                return playResult;
+                return;
             }
             String mediaServerId = streamInfo.getMediaServerId();
             MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
@@ -178,14 +149,13 @@
                         msg.setData(wvpResult);
 
                         resultHolder.invokeAllResult(msg);
-                        return playResult;
+                        return;
                     } else {
                         WVPResult wvpResult = new WVPResult();
                         wvpResult.setCode(ErrorCode.SUCCESS.getCode());
                         wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
                         wvpResult.setData(streamInfo);
                         msg.setData(wvpResult);
-
                         resultHolder.invokeAllResult(msg);
                         if (hookEvent != null) {
                             hookEvent.response(mediaServerItem, JSON.parseObject(JSON.toJSONString(streamInfo)));
@@ -211,7 +181,6 @@
                 streamId = String.format("%s_%s", device.getDeviceId(), channelId);
             }
             SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
-            logger.info(JSONObject.toJSONString(ssrcInfo));
             play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> {
                 if (hookEvent != null) {
                     hookEvent.response(mediaServerItem, response);
@@ -238,16 +207,15 @@
                 msg.setData(wvpResult);
                 // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰
                 resultHolder.invokeAllResult(msg);
-            }, uuid);
+            });
         }
-        return playResult;
     }
 
 
     @Override
     public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
                      ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
-                     InviteTimeOutCallback timeoutCallback, String uuid) {
+                     InviteTimeOutCallback timeoutCallback) {
 
         String streamId = null;
         if (mediaServerItem.isRtpEnable()) {
@@ -281,6 +249,16 @@
         //绔彛鑾峰彇澶辫触鐨剆srcInfo 娌℃湁蹇呰鍙戦�佺偣鎾寚浠�
         if (ssrcInfo.getPort() <= 0) {
             logger.info("[鐐规挱绔彛鍒嗛厤寮傚父]锛宒eviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
+            dynamicTask.stop(timeOutTaskKey);
+            // 閲婃斁ssrc
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+
+            streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+
+            RequestMessage msg = new RequestMessage();
+            msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);
+            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "鐐规挱绔彛鍒嗛厤寮傚父"));
+            resultHolder.invokeAllResult(msg);
             return;
         }
         try {
@@ -289,9 +267,15 @@
                 System.out.println("鍋滄瓒呮椂浠诲姟锛� " + timeOutTaskKey);
                 dynamicTask.stop(timeOutTaskKey);
                 // hook鍝嶅簲
-                onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
+                onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
                 hookEvent.response(mediaServerItemInuse, response);
                 logger.info("[鐐规挱鎴愬姛] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
+                String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp",  stream);
+                String path = "snap";
+                String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
+                // 璇锋眰鎴浘
+                logger.info("[璇锋眰鎴浘]: " + fileName);
+                zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
 
             }, (event) -> {
                 ResponseEvent responseEvent = (ResponseEvent) event.event;
@@ -331,7 +315,7 @@
                                 logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
                                 dynamicTask.stop(timeOutTaskKey);
                                 // hook鍝嶅簲
-                                onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
+                                onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
                                 hookEvent.response(mediaServerItemInUse, response);
                             });
                         }
@@ -367,13 +351,41 @@
     }
 
     @Override
-    public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
+    public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
+        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
         RequestMessage msg = new RequestMessage();
-        if (uuid != null) {
+        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
+        if (streamInfo != null) {
+            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+            if (deviceChannel != null) {
+                deviceChannel.setStreamId(streamInfo.getStream());
+                storager.startPlay(deviceId, channelId, streamInfo.getStream());
+            }
+            redisCatchStorage.startPlay(streamInfo);
+
+            WVPResult wvpResult = new WVPResult();
+            wvpResult.setCode(ErrorCode.SUCCESS.getCode());
+            wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
+            wvpResult.setData(streamInfo);
+
+            msg.setData(wvpResult);
+            resultHolder.invokeAllResult(msg);
+
+        } else {
+            logger.warn("璁惧棰勮API璋冪敤澶辫触锛�");
+            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "璁惧棰勮API璋冪敤澶辫触锛�"));
+            resultHolder.invokeAllResult(msg);
+        }
+    }
+
+    private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
+        RequestMessage msg = new RequestMessage();
+        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
+        if (!ObjectUtils.isEmpty(uuid)) {
             msg.setId(uuid);
         }
-        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
         StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
+
         if (streamInfo != null) {
             DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
             if (deviceChannel != null) {
@@ -390,8 +402,8 @@
 
             resultHolder.invokeAllResult(msg);
         } else {
-            logger.warn("璁惧棰勮API璋冪敤澶辫触锛�");
-            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "璁惧棰勮API璋冪敤澶辫触锛�"));
+            logger.warn("褰曞儚鍥炴斁璋冪敤澶辫触锛�");
+            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "褰曞儚鍥炴斁璋冪敤澶辫触锛�"));
             resultHolder.invokeAllResult(msg);
         }
     }
@@ -545,7 +557,7 @@
                                             logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
                                             dynamicTask.stop(playBackTimeOutTaskKey);
                                             // hook鍝嶅簲
-                                            onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
+                                            onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
                                             hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
                                         });
                                     }
@@ -568,6 +580,8 @@
         return result;
     }
 
+
+
     @Override
     public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
         Device device = storager.queryVideoDevice(deviceId);

--
Gitblit v1.8.0