648540858
2023-06-28 a77628e8759901abc3219412fc2c4aced940db28
优化端口预占用,防止占用无法释放
4个文件已修改
1个文件已添加
273 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java 84 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 34 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java
New file
@@ -0,0 +1,46 @@
package com.genersoft.iot.vmp.gb28181.bean;
import javax.sdp.SessionDescription;
/**
 * 28181 的SDP解析器
 */
public class Gb28181Sdp  {
    private SessionDescription baseSdb;
    private String ssrc;
    private String mediaDescription;
    public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescription) {
        Gb28181Sdp gb28181Sdp = new Gb28181Sdp();
        gb28181Sdp.setBaseSdb(baseSdb);
        gb28181Sdp.setSsrc(ssrc);
        gb28181Sdp.setMediaDescription(mediaDescription);
        return gb28181Sdp;
    }
    public SessionDescription getBaseSdb() {
        return baseSdb;
    }
    public void setBaseSdb(SessionDescription baseSdb) {
        this.baseSdb = baseSdb;
    }
    public String getSsrc() {
        return ssrc;
    }
    public void setSsrc(String ssrc) {
        this.ssrc = ssrc;
    }
    public String getMediaDescription() {
        return mediaDescription;
    }
    public void setMediaDescription(String mediaDescription) {
        this.mediaDescription = mediaDescription;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -241,21 +241,8 @@
                // 解析sdp消息, 使用jainsip 自带的sdp解析方式
                String contentString = new String(request.getRawContent());
                // jainSip不支持y=字段, 移除以解析。
                int ssrcIndex = contentString.indexOf("y=");
                // 检查是否有y字段
                String ssrcDefault = "0000000000";
                String ssrc;
                SessionDescription sdp;
                if (ssrcIndex >= 0) {
                    //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段
                    ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
                    String substring = contentString.substring(0, contentString.indexOf("y="));
                    sdp = SdpFactory.getInstance().createSessionDescription(substring);
                } else {
                    ssrc = ssrcDefault;
                    sdp = SdpFactory.getInstance().createSessionDescription(contentString);
                }
                Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
                SessionDescription sdp = gb28181Sdp.getBaseSdb();
                String sessionName = sdp.getSessionName().getValue();
                Long startTime = null;
@@ -317,7 +304,7 @@
                String username = sdp.getOrigin().getUsername();
                String addressStr = sdp.getConnection().getAddress();
                logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc);
                Device device = null;
                // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
                if (channel != null) {
@@ -341,8 +328,30 @@
                        }
                        return;
                    }
                    String ssrc;
                    if (gb28181Sdp.getSsrc() == null) {
                        // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                        logger.warn("[上级Invite] {} 平台:{}, 通道:{}, 缺少 ssrc,补充为: {}", sessionName, username, channelId, ssrc);
                    }else {
                        ssrc = gb28181Sdp.getSsrc();
                    }
                    String streamTypeStr = null;
                    if (mediaTransmissionTCP) {
                        if (tcpActive) {
                            streamTypeStr = "TCP-ACTIVE";
                        }else {
                            streamTypeStr = "TCP-PASSIVE";
                        }
                    }else {
                        streamTypeStr = "UDP";
                    }
                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
                                return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
                            });
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
@@ -469,7 +478,7 @@
                            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
                            logger.info(JSONObject.toJSONString(ssrcInfo));
                            sendRtpItem.setStreamId(ssrcInfo.getStream());
                            sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc);
                            sendRtpItem.setSsrc(ssrc);
                            // 写入redis, 超时时回复
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -480,12 +489,7 @@
                            });
                        } else {
                            // 当前系统作为下级平台使用,当上级平台点播时不携带ssrc时,并且设备在当前系统中已经点播了。这个时候需要重新给生成一个ssrc,不使用默认的"0000000000"。
                            if (ssrc.equals(ssrcDefault)) {
                                ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
                                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                                sendRtpItem.setSsrc(ssrc);
                            }
                            sendRtpItem.setSsrc(ssrc);
                            sendRtpItem.setStreamId(playTransaction.getStream());
                            // 写入redis, 超时时回复
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -496,11 +500,15 @@
                        }
                    }
                } else if (gbStream != null) {
                    if(ssrc.equals(ssrcDefault))
                    {
                        ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
                        ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                    String ssrc;
                    if (gb28181Sdp.getSsrc() == null) {
                        // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                    }else {
                        ssrc = gb28181Sdp.getSsrc();
                    }
                    if("push".equals(gbStream.getStreamType())) {
                        if (streamPushItem != null && streamPushItem.isPushIng()) {
                            // 推流状态
@@ -545,7 +553,9 @@
            if (streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
                            return  redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
                        });
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -584,7 +594,9 @@
            if (streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
                            return  redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
                        });
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -701,7 +713,9 @@
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
                                return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
                            });
                    if (sendRtpItem == null) {
                        logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java
@@ -1,14 +1,22 @@
package com.genersoft.iot.vmp.gb28181.utils;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp;
import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.GitUtil;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.Subject;
import gov.nist.javax.sip.message.SIPRequest;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import javax.sdp.SdpFactory;
import javax.sdp.SdpParseException;
import javax.sdp.SessionDescription;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
import javax.sip.header.FromHeader;
@@ -16,6 +24,8 @@
import javax.sip.header.UserAgentHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -27,6 +37,8 @@
 * @createTime 2021年09月27日 15:12:00
 */
public class SipUtils {
    private final static Logger logger = LoggerFactory.getLogger(SipUtils.class);
    public static String getUserIdFromFromHeader(Request request) {
        FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
@@ -51,7 +63,7 @@
    }
    public static  String getNewViaTag() {
        return "z9hG4bK" + System.currentTimeMillis();
        return "z9hG4bK" + RandomStringUtils.randomNumeric(10);
    }
    public static UserAgentHeader createUserAgentHeader(GitUtil gitUtil) throws PeerUnavailableException, ParseException {
@@ -113,6 +125,12 @@
        strTmp = String.format("%02X", moveSpeed);
        builder.append(strTmp, 0, 2);
        builder.append(strTmp, 0, 2);
        //优化zoom低倍速下的变倍速率
        if ((zoomSpeed > 0) && (zoomSpeed <16))
        {
            zoomSpeed = 16;
        }
        strTmp = String.format("%X", zoomSpeed);
        builder.append(strTmp, 0, 1).append("0");
        //计算校验码
@@ -183,4 +201,66 @@
        }
        return deviceChannel;
    }
}
    public static Gb28181Sdp parseSDP(String sdpStr) throws SdpParseException {
        // jainSip不支持y= f=字段, 移除以解析。
        int ssrcIndex = sdpStr.indexOf("y=");
        int mediaDescriptionIndex = sdpStr.indexOf("f=");
        // 检查是否有y字段
        SessionDescription sdp;
        String ssrc = null;
        String mediaDescription = null;
        if (mediaDescriptionIndex == 0 && ssrcIndex == 0) {
            sdp = SdpFactory.getInstance().createSessionDescription(sdpStr);
        }else {
            String lines[] = sdpStr.split("\\r?\\n");
            StringBuilder sdpBuffer = new StringBuilder();
            for (String line : lines) {
                if (line.trim().startsWith("y=")) {
                    ssrc = line.substring(2);
                }else if (line.trim().startsWith("f=")) {
                    mediaDescription = line.substring(2);
                }else {
                    sdpBuffer.append(line.trim()).append("\r\n");
                }
            }
            sdp = SdpFactory.getInstance().createSessionDescription(sdpBuffer.toString());
        }
        return Gb28181Sdp.getInstance(sdp, ssrc, mediaDescription);
    }
    public static String getSsrcFromSdp(String sdpStr) {
        // jainSip不支持y= f=字段, 移除以解析。
        int ssrcIndex = sdpStr.indexOf("y=");
        if (ssrcIndex == 0) {
            return null;
        }
        String lines[] = sdpStr.split("\\r?\\n");
        for (String line : lines) {
            if (line.trim().startsWith("y=")) {
                return line.substring(2);
            }
        }
        return null;
    }
    public static String parseTime(String timeStr) {
        if (ObjectUtils.isEmpty(timeStr)){
            return null;
        }
        LocalDateTime localDateTime;
        try {
            localDateTime = LocalDateTime.parse(timeStr);
        }catch (DateTimeParseException e) {
            try {
                localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601);
            }catch (DateTimeParseException e2) {
                logger.error("[格式化时间] 无法格式化时间: {}", timeStr);
                return null;
            }
        }
        return localDateTime.format(DateUtil.formatterISO8601);
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -219,13 +219,14 @@
     * @param tcp 是否为tcp
     * @return SendRtpItem
     */
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
                                         String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
        // 默认为随机端口
        int localPort = 0;
        if (userSetting.getGbSendStreamStrict()) {
            if (userSetting.getGbSendStreamStrict()) {
                localPort = keepPort(serverItem, ssrc);
                localPort = keepPort(serverItem, ssrc, localPort, callback);
                if (localPort == 0) {
                    return null;
                }
@@ -257,11 +258,12 @@
     * @param tcp 是否为tcp
     * @return SendRtpItem
     */
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
                                         String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
        // 默认为随机端口
        int localPort = 0;
        if (userSetting.getGbSendStreamStrict()) {
            localPort = keepPort(serverItem, ssrc);
            localPort = keepPort(serverItem, ssrc, localPort, callback);
            if (localPort == 0) {
                return null;
            }
@@ -282,13 +284,16 @@
        return sendRtpItem;
    }
    public interface KeepPortCallback{
        Boolean keep(String ssrc);
    }
    /**
     * 保持端口,直到需要需要发流时再释放
     */
    public int keepPort(MediaServerItem serverItem, String ssrc) {
        int localPort = 0;
    public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) {
        Map<String, Object> param = new HashMap<>(3);
        param.put("port", 0);
        param.put("port", localPort);
        param.put("enable_tcp", 1);
        param.put("stream_id", ssrc);
        JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
@@ -296,10 +301,21 @@
            localPort = jsonObject.getInteger("port");
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            Integer finalLocalPort = localPort;
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (MediaServerItem mediaServerItem, JSONObject response)->{
                        logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
                        keepPort(serverItem, ssrc);
                        System.out.println("监听端口到期继续保持监听");
                        System.out.println(response);
                        if (ssrc.equals(response.getString("stream_id"))) {
                            if (keepPortCallback.keep(ssrc)) {
                                logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
                                keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback);
                            }else {
                                logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc);
                                releasePort(serverItem, ssrc);
                            }
                        }
                    });
            logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
        }else {
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -13,6 +13,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,6 +27,7 @@
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -314,7 +316,9 @@
        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                content.getPort(), content.getSsrc(), content.getPlatformId(),
                content.getApp(), content.getStream(), content.getChannelId(),
                content.getTcp(), content.getRtcp());
                content.getTcp(), content.getRtcp(), ssrcFromCallback -> {
                    return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null;
                });
        WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
        result.setCode(0);
@@ -391,4 +395,31 @@
        });
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
        if (platformGbId == null) {
            platformGbId = "*";
        }
        if (channelId == null) {
            channelId = "*";
        }
        if (streamId == null) {
            streamId = "*";
        }
        if (callId == null) {
            callId = "*";
        }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
                + callId;
        List<Object> scan = RedisUtil.scan(redisTemplate, key);
        if (scan.size() > 0) {
            return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
        }else {
            return null;
        }
    }
}