| | |
| | | import com.genersoft.iot.vmp.storager.dao.*; |
| | | import com.genersoft.iot.vmp.utils.DateUtil; |
| | | import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; |
| | | import com.genersoft.iot.vmp.vmanager.bean.StreamContent; |
| | | import com.github.pagehelper.PageHelper; |
| | | import com.github.pagehelper.PageInfo; |
| | | import org.slf4j.Logger; |
| | |
| | | streamPushMapper.update(transform); |
| | | gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId()); |
| | | } |
| | | // TODO 相关的事件自行管理,不需要写入ZLMMediaListManager |
| | | // ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); |
| | | // if ( channelOnlineEventLister != null) { |
| | | // try { |
| | | // channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; |
| | | // } catch (ParseException e) { |
| | | // logger.error("addPush: ", e); |
| | | // } |
| | | // removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); |
| | | // } |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event); |
| | | if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) { |
| | | StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( |
| | | event.getMediaServer(), event.getApp(), event.getStream(), event.getMediaInfo(), event.getCallId()); |
| | | event.getHookParam().setStreamInfo(new StreamContent(streamInfo)); |
| | | redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event); |
| | | } |
| | | |
| | | // 发送流变化redis消息 |
| | | JSONObject jsonObject = new JSONObject(); |
| | |
| | | @EventListener |
| | | public void onApplicationEvent(MediaDepartureEvent event) { |
| | | // 兼容流注销时类型从redis记录获取 |
| | | OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( |
| | | MediaInfo mediaInfo = redisCatchStorage.getStreamInfo( |
| | | event.getApp(), event.getStream(), event.getMediaServer().getId()); |
| | | if (onStreamChangedHookParam != null) { |
| | | String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); |
| | | if (mediaInfo != null) { |
| | | String type = OriginType.values()[mediaInfo.getOriginType()].getType(); |
| | | redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream()); |
| | | if ("PUSH".equalsIgnoreCase(type)) { |
| | | // 冗余数据,自己系统中自用 |
| | |
| | | streamPushItem.setStream(item.getStream()); |
| | | streamPushItem.setAliveSecond(item.getAliveSecond()); |
| | | streamPushItem.setOriginSock(item.getOriginSock()); |
| | | streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + ""); |
| | | streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); |
| | | streamPushItem.setOriginType(item.getOriginType()); |
| | | streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); |
| | | streamPushItem.setOriginUrl(item.getOriginUrl()); |
| | |
| | | } |
| | | |
| | | @Override |
| | | public boolean stop(String app, String streamId) { |
| | | logger.info("[推流 ] 停止流: {}/{}", app, streamId); |
| | | StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); |
| | | public boolean stop(String app, String stream) { |
| | | logger.info("[推流 ] 停止流: {}/{}", app, stream); |
| | | StreamPushItem streamPushItem = streamPushMapper.selectOne(app, stream); |
| | | if (streamPushItem != null) { |
| | | gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); |
| | | } |
| | | |
| | | platformGbStreamMapper.delByAppAndStream(app, streamId); |
| | | gbStreamMapper.del(app, streamId); |
| | | int delStream = streamPushMapper.del(app, streamId); |
| | | platformGbStreamMapper.delByAppAndStream(app, stream); |
| | | gbStreamMapper.del(app, stream); |
| | | int delStream = streamPushMapper.del(app, stream); |
| | | if (delStream > 0) { |
| | | MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); |
| | | mediaServerService.closeStreams(mediaServerItem,app, streamId); |
| | | mediaServerService.closeStreams(mediaServerItem,app, stream); |
| | | } |
| | | return true; |
| | | } |
| | |
| | | List<StreamPushItem> pushList = getPushList(mediaServerId); |
| | | Map<String, StreamPushItem> pushItemMap = new HashMap<>(); |
| | | // redis记录 |
| | | List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>(); |
| | | List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, MediaInfo> streamInfoPushItemMap = new HashMap<>(); |
| | | if (pushList.size() > 0) { |
| | | for (StreamPushItem streamPushItem : pushList) { |
| | | if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { |
| | |
| | | } |
| | | } |
| | | } |
| | | if (onStreamChangedHookParams.size() > 0) { |
| | | for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { |
| | | streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); |
| | | if (mediaInfoList.size() > 0) { |
| | | for (MediaInfo mediaInfo : mediaInfoList) { |
| | | streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo); |
| | | } |
| | | } |
| | | // 获取所有推流鉴权信息,清理过期的 |
| | |
| | | } |
| | | |
| | | } |
| | | Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); |
| | | if (offlineOnStreamChangedHookParamList.size() > 0) { |
| | | Collection<MediaInfo> mediaInfos = streamInfoPushItemMap.values(); |
| | | if (mediaInfos.size() > 0) { |
| | | String type = "PUSH"; |
| | | for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { |
| | | for (MediaInfo mediaInfo : mediaInfos) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); |
| | | jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); |
| | | jsonObject.put("app", mediaInfo.getApp()); |
| | | jsonObject.put("stream", mediaInfo.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream()); |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); |
| | | redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId()); |
| | | } |
| | | } |
| | | |
| | |
| | | // 发送流停止消息 |
| | | String type = "PUSH"; |
| | | // 发送redis消息 |
| | | List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | if (streamInfoList.size() > 0) { |
| | | for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { |
| | | List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | if (mediaInfoList.size() > 0) { |
| | | for (MediaInfo mediaInfo : mediaInfoList) { |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); |
| | | redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", onStreamChangedHookParam.getApp()); |
| | | jsonObject.put("stream", onStreamChangedHookParam.getStream()); |
| | | jsonObject.put("app", mediaInfo.getApp()); |
| | | jsonObject.put("stream", mediaInfo.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId); |
| | | redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId); |
| | | } |
| | | } |
| | | } |
| | |
| | | public Map<String, StreamPushItem> getAllAppAndStreamMap() { |
| | | return streamPushMapper.getAllAppAndStreamMap(); |
| | | } |
| | | |
| | | @Override |
| | | public void updatePush(OnStreamChangedHookParam param) { |
| | | StreamPushItem transform = transform(param); |
| | | StreamPushItem pushInDb = getPush(param.getApp(), param.getStream()); |
| | | transform.setPushIng(param.isRegist()); |
| | | transform.setUpdateTime(DateUtil.getNow()); |
| | | transform.setPushTime(DateUtil.getNow()); |
| | | transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); |
| | | if (pushInDb == null) { |
| | | transform.setCreateTime(DateUtil.getNow()); |
| | | streamPushMapper.add(transform); |
| | | }else { |
| | | streamPushMapper.update(transform); |
| | | gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); |
| | | } |
| | | } |
| | | } |