648540858
2022-07-19 28b5cc39d0a2d9939f70b4c980a31d9b27fc1e4c
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -3,10 +3,10 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.TreeType;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -23,14 +23,19 @@
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.StringUtils;
import java.net.InetAddress;
import java.util.*;
/**
@@ -48,7 +53,7 @@
    private IMediaService mediaService;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;;
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private StreamProxyMapper streamProxyMapper;
@@ -61,9 +66,6 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private GbStreamMapper gbStreamMapper;
@@ -83,6 +85,12 @@
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    DataSourceTransactionManager dataSourceTransactionManager;
    @Autowired
    TransactionDefinition transactionDefinition;
    @Override
    public WVPResult<StreamInfo> save(StreamProxyItem param) {
@@ -99,6 +107,7 @@
            wvpResult.setMsg("保存失败");
            return wvpResult;
        }
        String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
                param.getStream() );
        param.setDst_url(dstUrl);
@@ -108,9 +117,9 @@
        boolean saveResult;
        // 更新
        if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
            saveResult = videoManagerStorager.updateStreamProxy(param);
            saveResult = updateStreamProxy(param);
        }else { // 新增
            saveResult = videoManagerStorager.addStreamProxy(param);
            saveResult = addStreamProxy(param);
        }
        if (saveResult) {
            result.append("保存成功");
@@ -124,7 +133,7 @@
                    if (param.isEnable_remove_none_reader()) {
                        del(param.getApp(), param.getStream());
                    }else {
                        videoManagerStorager.updateStreamProxy(param);
                        updateStreamProxy(param);
                    }
                }else {
@@ -154,6 +163,7 @@
                for (ParentPlatform parentPlatform : parentPlatforms) {
                    param.setPlatformId(parentPlatform.getServerGBId());
                    param.setCatalogId(parentPlatform.getCatalogId());
                    String stream = param.getStream();
                    StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(param.getApp(), stream, parentPlatform.getServerGBId());
                    if (streamProxyItems == null) {
@@ -166,6 +176,77 @@
        wvpResult.setMsg(result.toString());
        return wvpResult;
    }
    /**
     * 新增代理流
     * @param streamProxyItem
     * @return
     */
    private boolean addStreamProxy(StreamProxyItem streamProxyItem) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        boolean result = false;
        streamProxyItem.setStreamType("proxy");
        streamProxyItem.setStatus(true);
        String now = DateUtil.getNow();
        streamProxyItem.setCreateTime(now);
        try {
            if (streamProxyMapper.add(streamProxyItem) > 0) {
                if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
                    if (gbStreamMapper.add(streamProxyItem) < 0) {
                        //事务回滚
                        dataSourceTransactionManager.rollback(transactionStatus);
                        return false;
                    }
                }
            }else {
                //事务回滚
                dataSourceTransactionManager.rollback(transactionStatus);
                return false;
            }
            result = true;
            dataSourceTransactionManager.commit(transactionStatus);     //手动提交
        }catch (Exception e) {
            logger.error("向数据库添加流代理失败:", e);
            dataSourceTransactionManager.rollback(transactionStatus);
        }
        return result;
    }
    /**
     * 更新代理流
     * @param streamProxyItem
     * @return
     */
    @Override
    public boolean updateStreamProxy(StreamProxyItem streamProxyItem) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        boolean result = false;
        streamProxyItem.setStreamType("proxy");
        try {
            if (streamProxyMapper.update(streamProxyItem) > 0) {
                if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
                    if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
                        //事务回滚
                        dataSourceTransactionManager.rollback(transactionStatus);
                        return false;
                    }
                }
            } else {
                //事务回滚
                dataSourceTransactionManager.rollback(transactionStatus);
                return false;
            }
            dataSourceTransactionManager.commit(transactionStatus);     //手动提交
            result = true;
        }catch (Exception e) {
            e.printStackTrace();
            dataSourceTransactionManager.rollback(transactionStatus);
        }
        return result;
    }
    @Override
@@ -194,7 +275,9 @@
    @Override
    public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
        if (param ==null) return null;
        if (param ==null) {
            return null;
        }
        MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
        JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
        return result;
@@ -228,13 +311,16 @@
    public boolean start(String app, String stream) {
        boolean result = false;
        StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
        if (!streamProxy.isEnable() &&  streamProxy != null) {
        if (!streamProxy.isEnable() ) {
            JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
            if (jsonObject == null) return false;
            if (jsonObject == null) {
                return false;
            }
            System.out.println(jsonObject);
            if (jsonObject.getInteger("code") == 0) {
                result = true;
                streamProxy.setEnable(true);
                videoManagerStorager.updateStreamProxy(streamProxy);
                updateStreamProxy(streamProxy);
            }
        }
        return result;
@@ -246,9 +332,9 @@
        StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
        if (streamProxyDto != null && streamProxyDto.isEnable()) {
            JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
            if (jsonObject.getInteger("code") == 0) {
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                streamProxyDto.setEnable(false);
                result = videoManagerStorager.updateStreamProxy(streamProxyDto);
                result = updateStreamProxy(streamProxyDto);
            }
        }
        return result;
@@ -314,7 +400,7 @@
        }
        streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
        // 其他的流设置离线
        streamProxyMapper.updateStatusByMediaServerId(false, mediaServerId);
        streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
        String type = "PULL";
        // 发送redis消息
@@ -341,7 +427,7 @@
    @Override
    public int updateStatus(boolean status, String app, String stream) {
        return streamProxyMapper.updateStatus(status, app, stream);
        return streamProxyMapper.updateStatus(app, stream, status);
    }
    private void syncPullStream(String mediaServerId){