lawrencehj
2021-03-14 a71063dd1fc25d99486b36ba65c3081a3c8c7c01
增加上级点播停止后通知设备停止推流功能,并自动与本地播放协同
6个文件已修改
108 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -156,6 +156,7 @@
            processor.setRequestEvent(evt);
            processor.setRedisCatchStorage(redisCatchStorage);
            processor.setZlmrtpServerFactory(zlmrtpServerFactory);
            processor.setSIPCommander(cmder);
            return processor;
        } else if (Request.CANCEL.equals(method)) {
            CancelRequestProcessor processor = new CancelRequestProcessor();
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
@@ -1,16 +1,23 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import javax.sip.address.SipURI;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.apache.log4j.Logger;
import java.text.ParseException;
import java.util.HashMap;
@@ -18,10 +25,12 @@
/**    
 * @Description: BYE请求处理器
 * @author: swwheihei
 * @date:   2020年5月3日 下午5:32:05
 * @author: lawrencehj
 * @date:   2021年3月9日
 */
public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
    private ISIPCommander cmder;
    private IRedisCatchStorage redisCatchStorage;
@@ -38,10 +47,8 @@
            Dialog dialog = evt.getDialog();
            if (dialog == null) return;
            if (dialog.getState().equals(DialogState.TERMINATED)) {
                String remoteUri = dialog.getRemoteParty().getURI().toString();
                String localUri = dialog.getLocalParty().getURI().toString();
                String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
                String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
                String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
                String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId);
                String streamId = sendRtpItem.getStreamId();
                Map<String, Object> param = new HashMap<>();
@@ -50,6 +57,11 @@
                param.put("stream",streamId);
                System.out.println("停止向上级推流:" + streamId);
                zlmrtpServerFactory.stopSendRtpStream(param);
                redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
                if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) {
                    System.out.println(streamId + "无其它观看者,通知设备停止推流");
                    cmder.streamByeCmd(streamId);
                }
            }
        } catch (SipException e) {
            e.printStackTrace();
@@ -58,8 +70,6 @@
        } catch (ParseException e) {
            e.printStackTrace();
        }
        // TODO 优先级99 Bye Request消息实现,此消息一般为级联消息,上级给下级发送视频停止指令
    }
    /***
@@ -89,4 +99,13 @@
    public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) {
        this.zlmrtpServerFactory = zlmrtpServerFactory;
    }
    public ISIPCommander getSIPCommander() {
        return cmder;
    }
    public void setSIPCommander(ISIPCommander cmder) {
        this.cmder = cmder;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -267,20 +267,25 @@
        }
        
        String streamId = json.getString("stream");
        cmder.streamByeCmd(streamId);
        StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
        if (streamInfo!=null){
            redisCatchStorage.stopPlay(streamInfo);
            storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
        }else{
            streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
            redisCatchStorage.stopPlayback(streamInfo);
        }
        
        JSONObject ret = new JSONObject();
        ret.put("code", 0);
        ret.put("close", true);
        if (streamInfo != null) {
            if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
                ret.put("close", false);
            } else {
                cmder.streamByeCmd(streamId);
                redisCatchStorage.stopPlay(streamInfo);
                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
            }
        }else{
            cmder.streamByeCmd(streamId);
            streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
            redisCatchStorage.stopPlayback(streamInfo);
        }
        return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
    }
    
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -153,6 +153,16 @@
    }
    /**
     * 查询转推的流是否有其它观看者
     * @param streamId
     * @return
     */
    public int totalReaderCount(String streamId) {
        JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
        return mediaInfo.getInteger("totalReaderCount");
    }
    /**
     * 调用zlm RESTful API —— stopSendRtp
     */
    public Boolean stopSendRtpStream(Map<String, Object>param) {
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -89,4 +89,17 @@
     */
    SendRtpItem querySendRTPServer(String platformGbId, String channelId);
    /**
     * 删除RTP推送信息缓存
     * @param platformGbId
     * @param channelId
     */
    void deleteSendRTPServer(String platformGbId, String channelId);
    /**
     * 查询某个通道是否存在上级点播(RTP推送)
     * @param channelId
     */
    boolean isChannelSendingRTP(String channelId);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -225,4 +225,30 @@
        return (SendRtpItem)redis.get(key);
    }
    /**
     * 删除RTP推送信息缓存
     * @param platformGbId
     * @param channelId
     */
    @Override
    public void deleteSendRTPServer(String platformGbId, String channelId) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId;
        redis.del(key);
    }
    /**
     * 查询某个通道是否存在上级点播(RTP推送)
     * @param channelId
     */
    @Override
    public boolean isChannelSendingRTP(String channelId) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId;
        List<Object> RtpStreams = redis.scan(key);
        if (RtpStreams.size() > 0) {
            return true;
        } else {
            return false;
        }
    }
}