From e1d476a54a7aa72f429e41e5c2957b77edbeb0a7 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 13 三月 2024 15:24:09 +0800
Subject: [PATCH] 处理收到redis推动的推流设备信息内容重复的问题
---
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 5 +++++
src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java | 4 ++++
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java | 9 +++++++++
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java | 21 +++++++++++++++------
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 4 ++++
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java | 7 ++++++-
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 3 +++
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java | 7 ++++---
8 files changed, 50 insertions(+), 10 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
index 43f1a8a..e409097 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
@@ -7,6 +7,7 @@
import com.github.pagehelper.PageInfo;
import java.util.List;
+import java.util.Map;
/**
* 绾ц仈鍥芥爣骞冲彴鍏宠仈娴佷笟鍔℃帴鍙�
@@ -71,4 +72,7 @@
void delAllPlatformInfo(String platformId, String catalogId);
List<GbStream> getGbChannelWithGbid(String gbId);
+
+ Map<String, GbStream> getAllGBId();
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
index 333b7b3..10b1eff 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -115,4 +115,7 @@
*/
ResourceBaseInfo getOverview();
+ Map<String, StreamPushItem> getAllAppAndStreamMap();
+
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
index 9fcbb40..c2c9d72 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
@@ -19,11 +19,11 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
@Service
@DS("master")
@@ -268,4 +268,9 @@
public List<GbStream> getGbChannelWithGbid(String gbId) {
return gbStreamMapper.selectByGBId(gbId);
}
+
+ @Override
+ public Map<String, GbStream> getAllGBId() {
+ return gbStreamMapper.getAllGBId();
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 13c452c..e2d7e68 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -548,4 +548,9 @@
return new ResourceBaseInfo(total, online);
}
+
+ @Override
+ public Map<String, StreamPushItem> getAllAppAndStreamMap() {
+ return streamPushMapper.getAllAppAndStreamMap();
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
index cb34ff5..24a19f3 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -2,6 +2,7 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -19,6 +20,7 @@
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -57,7 +59,8 @@
try {
List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
//鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
- List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+ Map<String, StreamPushItem> allAppAndStream = streamPushService.getAllAppAndStreamMap();
+ Map<String, GbStream> allGBId = gbStreamService.getAllGBId();
/**
* 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
@@ -67,9 +70,15 @@
for (StreamPushItem streamPushItem : streamPushItems) {
String app = streamPushItem.getApp();
String stream = streamPushItem.getStream();
- boolean contains = allAppAndStream.contains(app + stream);
+ boolean contains = allAppAndStream.containsKey(app + stream);
//涓嶅瓨鍦ㄥ氨娣诲姞
if (!contains) {
+ if (allGBId.containsKey(streamPushItem.getGbId())) {
+ GbStream gbStream = allGBId.get(streamPushItem.getGbId());
+ logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊] 鍥芥爣缂栧彿閲嶅: {}, 宸插垎閰嶇粰{}/{}",
+ streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream());
+ continue;
+ }
streamPushItem.setStreamType("push");
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
@@ -77,25 +86,25 @@
streamPushItem.setOriginTypeStr("rtsp_push");
streamPushItem.setTotalReaderCount("0");
streamPushItemForSave.add(streamPushItem);
+ allGBId.put(streamPushItem.getGbId(), streamPushItem);
} else {
//瀛樺湪灏卞彧淇敼 name鍜実bId
streamPushItemForUpdate.add(streamPushItem);
}
}
- if (streamPushItemForSave.size() > 0) {
-
+ if (!streamPushItemForSave.isEmpty()) {
logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
logger.info(JSONObject.toJSONString(streamPushItemForSave));
streamPushService.batchAdd(streamPushItemForSave);
}
- if(streamPushItemForUpdate.size()>0){
+ if(!streamPushItemForUpdate.isEmpty()){
logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
}
}catch (Exception e) {
- logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+ logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", new String(message.getBody()));
logger.error("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊] 寮傚父鍐呭锛� ", e);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index 6591e3f..3790bda 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -10,6 +10,7 @@
import org.springframework.stereotype.Repository;
import java.util.List;
+import java.util.Map;
@Mapper
@Repository
@@ -170,4 +171,7 @@
@Select("SELECT status FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}")
Boolean selectStatusForPush(@Param("app") String app, @Param("stream") String stream);
+ @MapKey("gbId")
+ @Select("SELECT * from wvp_gb_stream")
+ Map<String, GbStream> getAllGBId();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
index 682f07c..daf21ef 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -7,6 +7,7 @@
import org.springframework.stereotype.Repository;
import java.util.List;
+import java.util.Map;
@Mapper
@Repository
@@ -195,4 +196,12 @@
"</foreach>" +
"</script>")
List<StreamPushItem> getListIn(List<StreamPushItem> streamPushItems);
+
+ @MapKey("vhost")
+ @Select("SELECT CONCAT(wsp.app, wsp.stream) as vhost, wsp.app, wsp.stream, wgs.gb_id, wgs.name " +
+ " from wvp_stream_push wsp " +
+ " left join wvp_gb_stream wgs on wgs.app = wsp.app and wgs.stream = wsp.stream")
+ Map<String, StreamPushItem> getAllAppAndStreamMap();
+
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java
index d479180..e65a579 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java
@@ -3,10 +3,9 @@
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IPlatformService;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.gb28181.gbStream.bean.GbStreamParam;
import com.github.pagehelper.PageInfo;
@@ -20,7 +19,6 @@
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
-import java.util.ArrayList;
import java.util.List;
@Tag(name = "瑙嗛娴佸叧鑱斿埌绾ц仈骞冲彴")
@@ -35,6 +33,9 @@
private IGbStreamService gbStreamService;
@Autowired
+ private IStreamPushService service;
+
+ @Autowired
private IPlatformService platformService;
--
Gitblit v1.8.0