From 82adc0cb23f3ee47322e78889cdaba57e9309000 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 21 三月 2023 15:55:24 +0800 Subject: [PATCH] 完善语音对讲级联 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 220 ++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 139 insertions(+), 81 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 1e00faa..6540e3e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -1,27 +1,36 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.TypeReference; +import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.*; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.util.ObjectUtils; import java.util.*; import java.util.stream.Collectors; @@ -36,6 +45,9 @@ @Autowired private StreamPushMapper streamPushMapper; + + @Autowired + private StreamProxyMapper streamProxyMapper; @Autowired private ParentPlatformMapper parentPlatformMapper; @@ -64,6 +76,16 @@ @Autowired private IMediaServerService mediaServerService; + @Autowired + DataSourceTransactionManager dataSourceTransactionManager; + + @Autowired + TransactionDefinition transactionDefinition; + + @Autowired + private MediaConfig mediaConfig; + + @Override public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) { if (jsonData == null) { @@ -72,8 +94,8 @@ Map<String, StreamPushItem> result = new HashMap<>(); - List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {}); - for (MediaItem item : mediaItems) { + List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {}); + for (OnStreamChangedHookParam item : onStreamChangedHookParams) { // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴� if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() @@ -91,7 +113,7 @@ return new ArrayList<>(result.values()); } @Override - public StreamPushItem transform(MediaItem item) { + public StreamPushItem transform(OnStreamChangedHookParam item) { StreamPushItem streamPushItem = new StreamPushItem(); streamPushItem.setApp(item.getApp()); streamPushItem.setMediaServerId(item.getMediaServerId()); @@ -102,7 +124,7 @@ streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); - streamPushItem.setCreateStamp(item.getCreateStamp() * 1000); + streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); @@ -127,31 +149,10 @@ public boolean saveToGB(GbStream stream) { stream.setStreamType("push"); stream.setStatus(true); - stream.setCreateStamp(System.currentTimeMillis()); + stream.setCreateTime(DateUtil.getNow()); + stream.setStreamType("push"); + stream.setMediaServerId(mediaConfig.getId()); int add = gbStreamMapper.add(stream); - - // 鏌ユ壘寮�鍚簡鍏ㄩ儴鐩存挱娴佸叡浜殑涓婄骇骞冲彴 - List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); - if (parentPlatforms.size() > 0) { - for (ParentPlatform parentPlatform : parentPlatforms) { - stream.setCatalogId(parentPlatform.getCatalogId()); - stream.setPlatformId(parentPlatform.getServerGBId()); - String streamId = stream.getStream(); - StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); - if (streamProxyItem == null) { - platformGbStreamMapper.add(stream); - eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); - }else { - if (!streamProxyItem.getGbId().equals(stream.getGbId())) { - // 姝ゆ祦浣跨敤鍙︿竴涓浗鏍嘔d宸茬粡涓庤骞冲彴鍏宠仈锛岀Щ闄ゆ璁板綍 - platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId()); - platformGbStreamMapper.add(stream); - eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); - } - } - } - } - return add > 0; } @@ -177,14 +178,15 @@ @Override public StreamPushItem getPush(String app, String streamId) { - return streamPushMapper.selectOne(app, streamId); } @Override public boolean stop(String app, String streamId) { StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); - gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); + if (streamPushItem != null) { + gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); + } platformGbStreamMapper.delByAppAndStream(app, streamId); gbStreamMapper.del(app, streamId); @@ -207,19 +209,25 @@ List<StreamPushItem> pushList = getPushList(mediaServerId); Map<String, StreamPushItem> pushItemMap = new HashMap<>(); // redis璁板綍 - List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); - Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>(); + List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { for (StreamPushItem streamPushItem : pushList) { - if (StringUtils.isEmpty(streamPushItem.getGbId())) { + if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); } } } - if (mediaItems.size() > 0) { - for (MediaItem mediaItem : mediaItems) { - streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); + if (onStreamChangedHookParams.size() > 0) { + for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { + streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); } + } + // 鑾峰彇鎵�鏈夋帹娴侀壌鏉冧俊鎭紝娓呯悊杩囨湡鐨� + List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo(); + Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>(); + for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { + streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); } zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ if (mediaList == null) { @@ -239,6 +247,7 @@ for (StreamPushItem streamPushItem : streamPushItems) { pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); } } List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); @@ -259,19 +268,27 @@ } } - Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values(); - if (offlineMediaItemList.size() > 0) { + Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); + if (offlineOnStreamChangedHookParamList.size() > 0) { String type = "PUSH"; - for (MediaItem offlineMediaItem : offlineMediaItemList) { + for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", offlineMediaItem.getApp()); - jsonObject.put("stream", offlineMediaItem.getStream()); + jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); + jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); + } + } + + Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); + if (streamAuthorityInfos.size() > 0) { + for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); } } })); @@ -284,19 +301,20 @@ streamPushMapper.deleteWithoutGBId(mediaServerId); gbStreamMapper.deleteWithoutGBId("push", mediaServerId); // 鍏朵粬鐨勬祦璁剧疆鏈惎鐢� - gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false); + streamPushMapper.updateStatusByMediaServerId(mediaServerId, false); + streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); // 鍙戦�佹祦鍋滄娑堟伅 String type = "PUSH"; // 鍙戦�乺edis娑堟伅 - List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); + List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); if (streamInfoList.size() > 0) { - for (MediaItem mediaItem : streamInfoList) { + for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); + redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", mediaItem.getApp()); - jsonObject.put("stream", mediaItem.getStream()); + jsonObject.put("app", onStreamChangedHookParam.getApp()); + jsonObject.put("stream", onStreamChangedHookParam.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); @@ -317,7 +335,7 @@ streamPushItem.setStreamType("push"); streamPushItem.setStatus(true); streamPushItem.setGbId("34020000004111" + gbId); - streamPushItem.setCreateStamp(System.currentTimeMillis()); + streamPushItem.setCreateTime(DateUtil.getNow()); gbId ++; } int limitCount = 30; @@ -340,39 +358,15 @@ public void batchAdd(List<StreamPushItem> streamPushItems) { streamPushMapper.addAll(streamPushItems); gbStreamMapper.batchAdd(streamPushItems); - // 鏌ユ壘寮�鍚簡鍏ㄩ儴鐩存挱娴佸叡浜殑涓婄骇骞冲彴 - List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); - if (parentPlatforms.size() > 0) { - for (StreamPushItem stream : streamPushItems) { - for (ParentPlatform parentPlatform : parentPlatforms) { - stream.setCatalogId(parentPlatform.getCatalogId()); - stream.setPlatformId(parentPlatform.getServerGBId()); - String streamId = stream.getStream(); - StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); - if (streamProxyItem == null) { - platformGbStreamMapper.add(stream); - eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); - }else { - if (!streamProxyItem.getGbId().equals(stream.getGbId())) { - // 姝ゆ祦浣跨敤鍙︿竴涓浗鏍嘔d宸茬粡涓庤骞冲彴鍏宠仈锛岀Щ闄ゆ璁板綍 - platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId()); - platformGbStreamMapper.add(stream); - eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); - stream.setGbId(streamProxyItem.getGbId()); - eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.DEL); - } - } - } - } - } } + @Override public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) { // 瀛樺偍鏁版嵁鍒皊tream_push琛� streamPushMapper.addAll(streamPushItems); List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() - .filter(streamPushItem-> streamPushItem.getId() != null) + .filter(streamPushItem-> streamPushItem.getGbId() != null) .collect(Collectors.toList()); // 瀛樺偍鏁版嵁鍒癵b_stream琛紝 id浼氳繑鍥炲埌streamPushItemForGbStream閲� if (streamPushItemForGbStream.size() > 0) { @@ -453,7 +447,6 @@ platformId, platformForEvent.get(platformId), CatalogEvent.ADD); } } - } } @@ -476,4 +469,69 @@ } return true; } + + @Override + public void allStreamOffline() { + List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb(); + if (onlinePushers.size() == 0) { + return; + } + streamPushMapper.setAllStreamOffline(); + + // 鍙戦�侀�氱煡 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); + } + + @Override + public void offline(List<StreamPushItemFromRedis> offlineStreams) { + // 鏇存柊閮ㄥ垎璁惧绂荤嚎 + List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams); + streamPushMapper.offline(offlineStreams); + // 鍙戦�侀�氱煡 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); + } + + @Override + public void online(List<StreamPushItemFromRedis> onlineStreams) { + // 鏇存柊閮ㄥ垎璁惧涓婄嚎streamPushService + List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams); + streamPushMapper.online(onlineStreams); + // 鍙戦�侀�氱煡 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); + } + + @Override + public boolean add(StreamPushItem stream) { + stream.setUpdateTime(DateUtil.getNow()); + stream.setCreateTime(DateUtil.getNow()); + stream.setServerId(userSetting.getServerId()); + + // 鏀惧湪浜嬪姟鍐呮墽琛� + boolean result = false; + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + try { + int addStreamResult = streamPushMapper.add(stream); + if (!ObjectUtils.isEmpty(stream.getGbId())) { + stream.setStreamType("push"); + gbStreamMapper.add(stream); + } + dataSourceTransactionManager.commit(transactionStatus); + result = true; + }catch (Exception e) { + logger.error("鎵归噺绉婚櫎娴佷笌骞冲彴鐨勫叧绯绘椂閿欒", e); + dataSourceTransactionManager.rollback(transactionStatus); + } + return result; + } + + @Override + public List<String> getAllAppAndStream() { + + return streamPushMapper.getAllAppAndStream(); + } + + @Override + public ResourceBaceInfo getOverview() { + return streamPushMapper.getOverview(userSetting.isUsePushingAsStatus()); + } } -- Gitblit v1.8.0