old mode 100644
new mode 100755
| | |
| | | package com.genersoft.iot.vmp.media.zlm; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.genersoft.iot.vmp.conf.UserSetup; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.GbStream; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.*; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IStreamProxyService; |
| | | import com.genersoft.iot.vmp.service.IStreamPushService; |
| | | import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorager; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| | | import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; |
| | | import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; |
| | | import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; |
| | | import com.genersoft.iot.vmp.utils.DateUtil; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.text.ParseException; |
| | | import java.util.*; |
| | | import java.util.regex.Matcher; |
| | | import java.util.regex.Pattern; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class ZLMMediaListManager { |
| | | |
| | |
| | | private IRedisCatchStorage redisCatchStorage; |
| | | |
| | | @Autowired |
| | | private IVideoManagerStorager storager; |
| | | private IVideoManagerStorage storager; |
| | | |
| | | @Autowired |
| | | private GbStreamMapper gbStreamMapper; |
| | |
| | | private StreamPushMapper streamPushMapper; |
| | | |
| | | @Autowired |
| | | private ZLMHttpHookSubscribe subscribe; |
| | | private ZlmHttpHookSubscribe subscribe; |
| | | |
| | | @Autowired |
| | | private UserSetup userSetup; |
| | | private UserSetting userSetting; |
| | | |
| | | @Autowired |
| | | private ZLMServerFactory zlmServerFactory; |
| | | |
| | | public void updateMediaList(MediaServerItem mediaServerItem) { |
| | | storager.clearMediaList(); |
| | | @Autowired |
| | | private IMediaServerService mediaServerService; |
| | | |
| | | // 使用异步的当时更新媒体流列表 |
| | | zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ |
| | | if (mediaList == null) return; |
| | | String dataStr = mediaList.getString("data"); |
| | | private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>(); |
| | | |
| | | Integer code = mediaList.getInteger("code"); |
| | | Map<String, StreamPushItem> result = new HashMap<>(); |
| | | List<StreamPushItem> streamPushItems = null; |
| | | // 获取所有的国标关联 |
| | | // List<GbStream> gbStreams = gbStreamMapper.selectAllByMediaServerId(mediaServerItem.getId()); |
| | | if (code == 0 ) { |
| | | if (dataStr != null) { |
| | | streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem); |
| | | } |
| | | }else { |
| | | logger.warn("更新视频流失败,错误code: " + code); |
| | | public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) { |
| | | StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam); |
| | | StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); |
| | | transform.setPushIng(onStreamChangedHookParam.isRegist()); |
| | | transform.setUpdateTime(DateUtil.getNow()); |
| | | transform.setPushTime(DateUtil.getNow()); |
| | | transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId())); |
| | | if (pushInDb == null) { |
| | | transform.setCreateTime(DateUtil.getNow()); |
| | | streamPushMapper.add(transform); |
| | | }else { |
| | | streamPushMapper.update(transform); |
| | | gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId()); |
| | | } |
| | | ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); |
| | | if ( channelOnlineEventLister != null) { |
| | | try { |
| | | channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; |
| | | } catch (ParseException e) { |
| | | logger.error("addPush: ", e); |
| | | } |
| | | |
| | | if (streamPushItems != null) { |
| | | storager.updateMediaList(streamPushItems); |
| | | for (StreamPushItem streamPushItem : streamPushItems) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("app", streamPushItem.getApp()); |
| | | jsonObject.put("stream", streamPushItem.getStream()); |
| | | jsonObject.put("mediaServerId", mediaServerItem.getId()); |
| | | subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_play,jsonObject, |
| | | (MediaServerItem mediaServerItemInuse, JSONObject response)->{ |
| | | updateMedia(mediaServerItem, response.getString("app"), response.getString("stream")); |
| | | } |
| | | ); |
| | | } |
| | | } |
| | | })); |
| | | |
| | | removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); |
| | | } |
| | | return transform; |
| | | } |
| | | |
| | | public void addMedia(MediaServerItem mediaServerItem, String app, String streamId) { |
| | | //使用异步更新推流 |
| | | updateMedia(mediaServerItem, app, streamId); |
| | | } |
| | | |
| | | public void addPush(MediaItem mediaItem) { |
| | | // 查找此直播流是否存在redis预设gbId |
| | | StreamPushItem transform = streamPushService.transform(mediaItem); |
| | | // 从streamId取出查询关键值 |
| | | Pattern pattern = Pattern.compile(userSetup.getThirdPartyGBIdReg()); |
| | | Matcher matcher = pattern.matcher(mediaItem.getStream());// 指定要匹配的字符串 |
| | | String queryKey = null; |
| | | if (matcher.find()) { //此处find()每次被调用后,会偏移到下一个匹配 |
| | | queryKey = matcher.group(); |
| | | } |
| | | if (queryKey != null) { |
| | | ThirdPartyGB thirdPartyGB = redisCatchStorage.queryMemberNoGBId(queryKey); |
| | | if (thirdPartyGB != null && !StringUtils.isEmpty(thirdPartyGB.getNationalStandardNo())) { |
| | | transform.setGbId(thirdPartyGB.getNationalStandardNo()); |
| | | transform.setName(thirdPartyGB.getName()); |
| | | } |
| | | } |
| | | storager.updateMedia(transform); |
| | | if (!StringUtils.isEmpty(transform.getGbId())) { |
| | | // 如果这个国标ID已经给了其他推流且流已离线,则移除其他推流 |
| | | List<GbStream> gbStreams = gbStreamMapper.selectByGBId(transform.getGbId()); |
| | | if (gbStreams.size() > 0) { |
| | | for (GbStream gbStream : gbStreams) { |
| | | // 出现使用相同国标Id的视频流时,使用新流替换旧流, |
| | | gbStreamMapper.del(gbStream.getApp(), gbStream.getStream()); |
| | | platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream()); |
| | | if (!gbStream.isStatus()) { |
| | | streamPushMapper.del(gbStream.getApp(), gbStream.getStream()); |
| | | } |
| | | public void sendStreamEvent(String app, String stream, String mediaServerId) { |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); |
| | | // 查看推流状态 |
| | | Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); |
| | | if (streamReady != null && streamReady) { |
| | | ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); |
| | | if (channelOnlineEventLister != null) { |
| | | try { |
| | | channelOnlineEventLister.run(app, stream, mediaServerId); |
| | | } catch (ParseException e) { |
| | | logger.error("sendStreamEvent: ", e); |
| | | } |
| | | } |
| | | if (gbStreamMapper.selectOne(transform.getApp(), transform.getStream()) != null) { |
| | | gbStreamMapper.update(transform); |
| | | }else { |
| | | gbStreamMapper.add(transform); |
| | | removedChannelOnlineEventLister(app, stream); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) { |
| | | //使用异步更新推流 |
| | | zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId, "rtmp", json->{ |
| | | |
| | | if (json == null) return; |
| | | String dataStr = json.getString("data"); |
| | | |
| | | Integer code = json.getInteger("code"); |
| | | Map<String, StreamPushItem> result = new HashMap<>(); |
| | | List<StreamPushItem> streamPushItems = null; |
| | | if (code == 0 ) { |
| | | if (dataStr != null) { |
| | | streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem); |
| | | } |
| | | }else { |
| | | logger.warn("更新视频流失败,错误code: " + code); |
| | | } |
| | | |
| | | if (streamPushItems != null && streamPushItems.size() == 1) { |
| | | storager.updateMedia(streamPushItems.get(0)); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | |
| | | public int removeMedia(String app, String streamId) { |
| | | // 查找是否关联了国标, 关联了不删除, 置为离线 |
| | | StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId); |
| | | int result = 0; |
| | | if (streamProxyItem == null) { |
| | | GbStream gbStream = gbStreamMapper.selectOne(app, streamId); |
| | | int result; |
| | | if (gbStream == null) { |
| | | result = storager.removeMedia(app, streamId); |
| | | }else { |
| | | result =storager.mediaOutline(app, streamId); |
| | | result =storager.mediaOffline(app, streamId); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { |
| | | this.channelOnPublishEvents.put(app + "_" + stream, callback); |
| | | } |
| | | |
| | | public void removedChannelOnlineEventLister(String app, String stream) { |
| | | this.channelOnPublishEvents.remove(app + "_" + stream); |
| | | } |
| | | |
| | | // public void clearAllSessions() { |
| | | // logger.info("清空所有国标相关的session"); |
| | | // JSONObject allSessionJSON = zlmresTfulUtils.getAllSession(); |
| | | // ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); |
| | | // HashSet<String> allLocalPorts = new HashSet(); |
| | | // if (allSessionJSON.getInteger("code") == 0) { |
| | | // JSONArray data = allSessionJSON.getJSONArray("data"); |
| | | // if (data.size() > 0) { |
| | | // for (int i = 0; i < data.size(); i++) { |
| | | // JSONObject sessionJOSN = data.getJSONObject(i); |
| | | // Integer local_port = sessionJOSN.getInteger("local_port"); |
| | | // if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) && |
| | | // !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) && |
| | | // !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) && |
| | | // !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) && |
| | | // !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) && |
| | | // !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){ |
| | | // allLocalPorts.add(sessionJOSN.getInteger("local_port") + ""); |
| | | // } |
| | | // } |
| | | // } |
| | | // } |
| | | // if (allLocalPorts.size() > 0) { |
| | | // List<String> result = new ArrayList<>(allLocalPorts); |
| | | // String localPortSStr = String.join(",", result); |
| | | // zlmresTfulUtils.kickSessions(localPortSStr); |
| | | // } |
| | | // } |
| | | public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { |
| | | return this.channelOnPublishEvents.get(app + "_" + stream); |
| | | } |
| | | |
| | | } |