From 3d6db7478d79b824f93708f936c598cc622221f2 Mon Sep 17 00:00:00 2001
From: mk1990 <153958232@qq.com>
Date: 星期一, 20 六月 2022 15:10:31 +0800
Subject: [PATCH] fix设备状态查询接口
---
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 211 +++++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 183 insertions(+), 28 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index c8bf191..1e00faa 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -1,37 +1,35 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
-import com.genersoft.iot.vmp.common.StreamInfo;
-import com.genersoft.iot.vmp.conf.UserSetup;
-import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
-import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
-import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
+import com.genersoft.iot.vmp.storager.dao.*;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.*;
+import java.util.stream.Collectors;
@Service
public class StreamPushServiceImpl implements IStreamPushService {
+
+ private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
@Autowired
private GbStreamMapper gbStreamMapper;
@@ -41,6 +39,9 @@
@Autowired
private ParentPlatformMapper parentPlatformMapper;
+
+ @Autowired
+ private PlatformCatalogMapper platformCatalogMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@@ -58,14 +59,16 @@
private IRedisCatchStorage redisCatchStorage;
@Autowired
- private UserSetup userSetup;
+ private UserSetting userSetting;
@Autowired
private IMediaServerService mediaServerService;
@Override
public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
- if (jsonData == null) return null;
+ if (jsonData == null) {
+ return null;
+ }
Map<String, StreamPushItem> result = new HashMap<>();
@@ -94,36 +97,37 @@
streamPushItem.setMediaServerId(item.getMediaServerId());
streamPushItem.setStream(item.getStream());
streamPushItem.setAliveSecond(item.getAliveSecond());
- streamPushItem.setCreateStamp(item.getCreateStamp());
streamPushItem.setOriginSock(item.getOriginSock());
streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
streamPushItem.setOriginType(item.getOriginType());
streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
streamPushItem.setOriginUrl(item.getOriginUrl());
- streamPushItem.setCreateStamp(item.getCreateStamp());
+ streamPushItem.setCreateStamp(item.getCreateStamp() * 1000);
streamPushItem.setAliveSecond(item.getAliveSecond());
streamPushItem.setStatus(true);
streamPushItem.setStreamType("push");
streamPushItem.setVhost(item.getVhost());
+ streamPushItem.setServerId(item.getSeverId());
return streamPushItem;
}
@Override
- public PageInfo<StreamPushItem> getPushList(Integer page, Integer count) {
+ public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
PageHelper.startPage(page, count);
- List<StreamPushItem> all = streamPushMapper.selectAll();
+ List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
return new PageInfo<>(all);
}
@Override
public List<StreamPushItem> getPushList(String mediaServerId) {
- return streamPushMapper.selectAllByMediaServerId(mediaServerId);
+ return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
}
@Override
public boolean saveToGB(GbStream stream) {
stream.setStreamType("push");
stream.setStatus(true);
+ stream.setCreateStamp(System.currentTimeMillis());
int add = gbStreamMapper.add(stream);
// 鏌ユ壘寮�鍚簡鍏ㄩ儴鐩存挱娴佸叡浜殑涓婄骇骞冲彴
@@ -155,12 +159,17 @@
public boolean removeFromGB(GbStream stream) {
// 鍒ゆ柇鏄惁闇�瑕佸彂閫佷簨浠�
gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
- int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
+ int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
- if (mediaList == null) {
- streamPushMapper.del(stream.getApp(), stream.getStream());
+ if (mediaList != null) {
+ if (mediaList.getInteger("code") == 0) {
+ JSONArray data = mediaList.getJSONArray("data");
+ if (data == null) {
+ streamPushMapper.del(stream.getApp(), stream.getStream());
+ }
+ }
}
return del > 0;
}
@@ -177,9 +186,9 @@
StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
- int delStream = streamPushMapper.del(app, streamId);
- gbStreamMapper.del(app, streamId);
platformGbStreamMapper.delByAppAndStream(app, streamId);
+ gbStreamMapper.del(app, streamId);
+ int delStream = streamPushMapper.del(app, streamId);
if (delStream > 0) {
MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
@@ -202,7 +211,9 @@
Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
if (pushList.size() > 0) {
for (StreamPushItem streamPushItem : pushList) {
- pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
+ if (StringUtils.isEmpty(streamPushItem.getGbId())) {
+ pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
+ }
}
}
if (mediaItems.size() > 0) {
@@ -211,7 +222,9 @@
}
}
zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
- if (mediaList == null) return;
+ if (mediaList == null) {
+ return;
+ }
String dataStr = mediaList.getString("data");
Integer code = mediaList.getInteger("code");
@@ -251,7 +264,7 @@
String type = "PUSH";
for (MediaItem offlineMediaItem : offlineMediaItemList) {
JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetup.getServerId());
+ jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", offlineMediaItem.getApp());
jsonObject.put("stream", offlineMediaItem.getStream());
jsonObject.put("register", false);
@@ -266,7 +279,7 @@
@Override
public void zlmServerOffline(String mediaServerId) {
- List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId);
+ List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
// 绉婚櫎娌℃湁GBId鐨勬帹娴�
streamPushMapper.deleteWithoutGBId(mediaServerId);
gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
@@ -281,7 +294,7 @@
// 绉婚櫎redis鍐呮祦鐨勪俊鎭�
redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetup.getServerId());
+ jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", mediaItem.getApp());
jsonObject.put("stream", mediaItem.getStream());
jsonObject.put("register", false);
@@ -304,6 +317,7 @@
streamPushItem.setStreamType("push");
streamPushItem.setStatus(true);
streamPushItem.setGbId("34020000004111" + gbId);
+ streamPushItem.setCreateStamp(System.currentTimeMillis());
gbId ++;
}
int limitCount = 30;
@@ -321,4 +335,145 @@
}
return true;
}
+
+ @Override
+ public void batchAdd(List<StreamPushItem> streamPushItems) {
+ streamPushMapper.addAll(streamPushItems);
+ gbStreamMapper.batchAdd(streamPushItems);
+ // 鏌ユ壘寮�鍚簡鍏ㄩ儴鐩存挱娴佸叡浜殑涓婄骇骞冲彴
+ List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
+ if (parentPlatforms.size() > 0) {
+ for (StreamPushItem stream : streamPushItems) {
+ for (ParentPlatform parentPlatform : parentPlatforms) {
+ stream.setCatalogId(parentPlatform.getCatalogId());
+ stream.setPlatformId(parentPlatform.getServerGBId());
+ String streamId = stream.getStream();
+ StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
+ if (streamProxyItem == null) {
+ platformGbStreamMapper.add(stream);
+ eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
+ }else {
+ if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
+ // 姝ゆ祦浣跨敤鍙︿竴涓浗鏍嘔d宸茬粡涓庤骞冲彴鍏宠仈锛岀Щ闄ゆ璁板綍
+ platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
+ platformGbStreamMapper.add(stream);
+ eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
+ stream.setGbId(streamProxyItem.getGbId());
+ eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.DEL);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
+ // 瀛樺偍鏁版嵁鍒皊tream_push琛�
+ streamPushMapper.addAll(streamPushItems);
+ List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
+ .filter(streamPushItem-> streamPushItem.getId() != null)
+ .collect(Collectors.toList());
+ // 瀛樺偍鏁版嵁鍒癵b_stream琛紝 id浼氳繑鍥炲埌streamPushItemForGbStream閲�
+ if (streamPushItemForGbStream.size() > 0) {
+ gbStreamMapper.batchAdd(streamPushItemForGbStream);
+ }
+ // 鍘婚櫎娌℃湁ID涔熷氨鏄病鏈夊瓨鍌ㄥ埌鏁版嵁搴撶殑鏁版嵁
+ List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
+ .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
+ .collect(Collectors.toList());
+
+ if (streamPushItemsForPlatform.size() > 0) {
+ // 鑾峰彇鎵�鏈夊钩鍙帮紝骞冲彴鍜岀洰褰曚俊鎭竴鑸笉浼氱壒鍒ぇ閲忋��
+ List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
+ Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
+ if (parentPlatformList.size() == 0) {
+ return;
+ }
+ for (ParentPlatform platform : parentPlatformList) {
+ Map<String, PlatformCatalog> catalogMap = new HashMap<>();
+
+ // 鍒涘缓鏍硅妭鐐�
+ PlatformCatalog platformCatalog = new PlatformCatalog();
+ platformCatalog.setId(platform.getServerGBId());
+ catalogMap.put(platform.getServerGBId(), platformCatalog);
+
+ // 鏌ヨ鎵�鏈夎妭鐐逛俊鎭�
+ List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
+ if (platformCatalogs.size() > 0) {
+ for (PlatformCatalog catalog : platformCatalogs) {
+ catalogMap.put(catalog.getId(), catalog);
+ }
+ }
+ platformInfoMap.put(platform.getServerGBId(), catalogMap);
+ }
+ List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
+ Map<String, List<GbStream>> platformForEvent = new HashMap<>();
+ // 閬嶅巻瀛樺偍缁撴灉锛屾煡鎵綼pp+Stream->platformId+catalogId鐨勫搴斿叧绯伙紝鐒跺悗鎵ц鎵归噺鍐欏叆
+ for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
+ List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
+ if (platFormInfoList != null && platFormInfoList.size() > 0) {
+ for (String[] platFormInfoArray : platFormInfoList) {
+ StreamPushItem streamPushItemForPlatform = new StreamPushItem();
+ streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
+ if (platFormInfoArray.length > 0) {
+ // 鏁扮粍 platFormInfoArray 0 涓哄钩鍙癐D銆� 1涓虹洰褰旾D
+ // 涓嶅瓨鍦ㄨ繖涓钩鍙帮紝鍒欏拷鐣ュ鍏ユ鍏宠仈鍏崇郴
+ if (platformInfoMap.get(platFormInfoArray[0]) == null
+ || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
+ logger.info("瀵煎叆鏁版嵁鏃朵笉瀛樺湪骞冲彴鎴栫洰褰晎}/{},宸插鍏ユ湭鍒嗛厤", platFormInfoArray[0], platFormInfoArray[1] );
+ continue;
+ }
+ streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
+ List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
+ if (gbStreamList == null) {
+ gbStreamList = new ArrayList<>();
+ platformForEvent.put(platFormInfoArray[0], gbStreamList);
+ }
+ // 涓哄彂閫侀�氱煡鏁寸悊鏁版嵁
+ streamPushItemForPlatform.setName(streamPushItem.getName());
+ streamPushItemForPlatform.setApp(streamPushItem.getApp());
+ streamPushItemForPlatform.setStream(streamPushItem.getStream());
+ streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
+ gbStreamList.add(streamPushItemForPlatform);
+ }
+ if (platFormInfoArray.length > 1) {
+ streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
+ }
+ streamPushItemListFroPlatform.add(streamPushItemForPlatform);
+ }
+
+ }
+ }
+ if (streamPushItemListFroPlatform.size() > 0) {
+ platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
+ // 鍙戦�侀�氱煡
+ for (String platformId : platformForEvent.keySet()) {
+ eventPublisher.catalogEventPublishForStream(
+ platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
+ }
+ }
+
+ }
+ }
+
+ @Override
+ public boolean batchStop(List<GbStream> gbStreams) {
+ if (gbStreams == null || gbStreams.size() == 0) {
+ return false;
+ }
+ gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
+
+ platformGbStreamMapper.delByGbStreams(gbStreams);
+ gbStreamMapper.batchDelForGbStream(gbStreams);
+ int delStream = streamPushMapper.delAllForGbStream(gbStreams);
+ if (delStream > 0) {
+ for (GbStream gbStream : gbStreams) {
+ MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
+ zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ }
+
+ }
+ return true;
+ }
}
--
Gitblit v1.8.0