| | |
| | | import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| | | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; |
| | | import com.genersoft.iot.vmp.media.bean.MediaInfo; |
| | | import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; |
| | | import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; |
| | | import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; |
| | | import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; |
| | | import com.genersoft.iot.vmp.media.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; |
| | | import com.genersoft.iot.vmp.media.bean.MediaServer; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; |
| | |
| | | @EventListener |
| | | public void onApplicationEvent(MediaDepartureEvent event) { |
| | | // 兼容流注销时类型从redis记录获取 |
| | | OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( |
| | | MediaInfo mediaInfo = redisCatchStorage.getStreamInfo( |
| | | event.getApp(), event.getStream(), event.getMediaServer().getId()); |
| | | if (onStreamChangedHookParam != null) { |
| | | String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); |
| | | if (mediaInfo != null) { |
| | | String type = OriginType.values()[mediaInfo.getOriginType()].getType(); |
| | | redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream()); |
| | | if ("PUSH".equalsIgnoreCase(type)) { |
| | | // 冗余数据,自己系统中自用 |
| | |
| | | streamPushItem.setStream(item.getStream()); |
| | | streamPushItem.setAliveSecond(item.getAliveSecond()); |
| | | streamPushItem.setOriginSock(item.getOriginSock()); |
| | | streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + ""); |
| | | streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); |
| | | streamPushItem.setOriginType(item.getOriginType()); |
| | | streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); |
| | | streamPushItem.setOriginUrl(item.getOriginUrl()); |
| | |
| | | List<StreamPushItem> pushList = getPushList(mediaServerId); |
| | | Map<String, StreamPushItem> pushItemMap = new HashMap<>(); |
| | | // redis记录 |
| | | List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>(); |
| | | List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, MediaInfo> streamInfoPushItemMap = new HashMap<>(); |
| | | if (pushList.size() > 0) { |
| | | for (StreamPushItem streamPushItem : pushList) { |
| | | if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { |
| | |
| | | } |
| | | } |
| | | } |
| | | if (onStreamChangedHookParams.size() > 0) { |
| | | for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { |
| | | streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); |
| | | if (mediaInfoList.size() > 0) { |
| | | for (MediaInfo mediaInfo : mediaInfoList) { |
| | | streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo); |
| | | } |
| | | } |
| | | // 获取所有推流鉴权信息,清理过期的 |
| | |
| | | } |
| | | |
| | | } |
| | | Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); |
| | | if (offlineOnStreamChangedHookParamList.size() > 0) { |
| | | Collection<MediaInfo> mediaInfos = streamInfoPushItemMap.values(); |
| | | if (mediaInfos.size() > 0) { |
| | | String type = "PUSH"; |
| | | for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { |
| | | for (MediaInfo mediaInfo : mediaInfos) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); |
| | | jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); |
| | | jsonObject.put("app", mediaInfo.getApp()); |
| | | jsonObject.put("stream", mediaInfo.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream()); |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); |
| | | redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId()); |
| | | } |
| | | } |
| | | |
| | |
| | | // 发送流停止消息 |
| | | String type = "PUSH"; |
| | | // 发送redis消息 |
| | | List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | if (streamInfoList.size() > 0) { |
| | | for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { |
| | | List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | if (mediaInfoList.size() > 0) { |
| | | for (MediaInfo mediaInfo : mediaInfoList) { |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); |
| | | redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", onStreamChangedHookParam.getApp()); |
| | | jsonObject.put("stream", onStreamChangedHookParam.getStream()); |
| | | jsonObject.put("app", mediaInfo.getApp()); |
| | | jsonObject.put("stream", mediaInfo.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId); |
| | | redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId); |
| | | } |
| | | } |
| | | } |
| | |
| | | public Map<String, StreamPushItem> getAllAppAndStreamMap() { |
| | | return streamPushMapper.getAllAppAndStreamMap(); |
| | | } |
| | | |
| | | @Override |
| | | public void updatePush(OnStreamChangedHookParam param) { |
| | | StreamPushItem transform = transform(param); |
| | | StreamPushItem pushInDb = getPush(param.getApp(), param.getStream()); |
| | | transform.setPushIng(param.isRegist()); |
| | | transform.setUpdateTime(DateUtil.getNow()); |
| | | transform.setPushTime(DateUtil.getNow()); |
| | | transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); |
| | | if (pushInDb == null) { |
| | | transform.setCreateTime(DateUtil.getNow()); |
| | | streamPushMapper.add(transform); |
| | | }else { |
| | | streamPushMapper.update(transform); |
| | | gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); |
| | | } |
| | | } |
| | | } |