648540858
2021-12-08 2166ec93624b6b9d2f5702d30b9f5030a37d72b5
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -3,10 +3,15 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.OriginType;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
@@ -19,10 +24,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
@Service
public class StreamPushServiceImpl implements IStreamPushService {
@@ -43,6 +45,9 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private UserSetup userSetup;
    @Autowired
    private IMediaServerService mediaServerService;
    @Override
@@ -55,7 +60,9 @@
        for (MediaItem item : mediaItems) {
            // 不保存国标推理以及拉流代理的流
            if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) {
            if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                    || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                    || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
                String key = item.getApp() + "_" + item.getStream();
                StreamPushItem streamPushItem = result.get(key);
                if (streamPushItem == null) {
@@ -84,6 +91,7 @@
        streamPushItem.setCreateStamp(item.getCreateStamp());
        streamPushItem.setAliveSecond(item.getAliveSecond());
        streamPushItem.setStatus(true);
        streamPushItem.setStreamType("push");
        streamPushItem.setVhost(item.getVhost());
        return streamPushItem;
    }
@@ -93,6 +101,11 @@
        PageHelper.startPage(page, count);
        List<StreamPushItem> all = streamPushMapper.selectAll();
        return new PageInfo<>(all);
    }
    @Override
    public List<StreamPushItem> getPushList(String mediaServerId) {
        return streamPushMapper.selectAllByMediaServerId(mediaServerId);
    }
    @Override
@@ -134,4 +147,85 @@
        return true;
    }
    @Override
    public void zlmServerOnline(String mediaServerId) {
        // 同步zlm推流信息
        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
        if (mediaServerItem == null) {
            return;
        }
        List<StreamPushItem> pushList = getPushList(mediaServerId);
        if (pushList.size() > 0) {
            Map<String, StreamPushItem> pushItemMap = new HashMap<>();
            for (StreamPushItem streamPushItem : pushList) {
                pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
            }
            zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
                if (mediaList == null) return;
                String dataStr = mediaList.getString("data");
                Integer code = mediaList.getInteger("code");
                List<StreamPushItem> streamPushItems = null;
                if (code == 0 ) {
                    if (dataStr != null) {
                        streamPushItems = handleJSON(dataStr, mediaServerItem);
                    }
                }
                if (streamPushItems != null) {
                    for (StreamPushItem streamPushItem : streamPushItems) {
                        pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                    }
                }
                Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
                if (offlinePushItems.size() > 0) {
                    String type = "PUSH";
                    streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
                    for (StreamPushItem offlinePushItem : offlinePushItems) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("serverId", userSetup.getServerId());
                        jsonObject.put("app", offlinePushItem.getApp());
                        jsonObject.put("stream", offlinePushItem.getStream());
                        jsonObject.put("register", false);
                        jsonObject.put("mediaServerId", mediaServerId);
                        redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                        // 移除redis内流的信息
                        redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream());
                    }
                }
            }));
        }
    }
    @Override
    public void zlmServerOffline(String mediaServerId) {
        List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId);
        // 移除没有GBId的推流
        streamPushMapper.deleteWithoutGBId(mediaServerId);
        gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
        // 其他的流设置未启用
        gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false);
        // 发送流停止消息
        String type = "PUSH";
        // 发送redis消息
        List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
        if (streamInfoList.size() > 0) {
            for (StreamInfo streamInfo : streamInfoList) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("serverId", userSetup.getServerId());
                jsonObject.put("app", streamInfo.getApp());
                jsonObject.put("stream", streamInfo.getStreamId());
                jsonObject.put("register", false);
                jsonObject.put("mediaServerId", mediaServerId);
                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
            }
        }
    }
    @Override
    public void clean() {
    }
}