修复并发点播时可能出现的rtpServer开启但是还未收到流的情况,编码类型136,137,138默认开启音频通道
11个文件已修改
118 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/user/UserController.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/logback-spring-local.xml 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -356,7 +356,7 @@
//            String streamMode = device.getStreamMode().toUpperCase();
            logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                if (event != null) {
                    event.response(mediaServerItemInUse, json);
@@ -524,7 +524,7 @@
            CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId());
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
            // 添加订阅
            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                        if (hookEvent != null) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -16,9 +16,7 @@
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
@@ -89,6 +87,9 @@
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
@@ -400,7 +401,14 @@
                        if (playTransaction != null) {
                            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
                            if (!streamReady) {
                                playTransaction = null;
                                boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream());
                                if (hasRtpServer) {
                                    logger.info("[上级点播]已经开启rtpServer但是尚未收到流,开启监听流的到来");
                                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId());
                                    zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent);
                                }else {
                                    playTransaction = null;
                                }
                            }
                        }
                        if (playTransaction == null) {
@@ -564,7 +572,7 @@
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
                // 平台设置中关闭了拉起离线的推流则直接回复
                responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable");
                responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                return;
            }
            // 发送redis消息以使设备上线
src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
@@ -203,6 +203,12 @@
            return null;
        }
        deviceChannel.setChannelId(channelId);
        int channelTypeCode = Integer.parseInt(channelId.substring(10, 13));
        if (channelTypeCode == 136 || channelTypeCode == 137 || channelTypeCode == 138) {
            deviceChannel.setHasAudio(true);
        }else {
            deviceChannel.setHasAudio(false);
        }
        if (event != null && !event.equals(CatalogEvent.ADD) && !event.equals(CatalogEvent.UPDATE)) {
            // 除了ADD和update情况下需要识别全部内容,
            return deviceChannel;
@@ -396,7 +402,6 @@
        } else {
            deviceChannel.setPTZType(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType")));
        }
        deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC
        return deviceChannel;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -96,6 +96,10 @@
        if(rtpInfo.getInteger("code") == 0){
            if (rtpInfo.getBoolean("exist")) {
                result = rtpInfo.getInteger("local_port");
                if (result == 0) {
                    // 此时说明rtpServer已经创建但是流还没有推上来
                }
                return result;
            }
        }else if(rtpInfo.getInteger("code") == -2){
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -83,4 +83,6 @@
    MediaServerItem getDefaultMediaServer();
    void updateMediaServerKeepalive(String mediaServerId, JSONObject data);
    boolean checkRtpServer(MediaServerItem mediaServerItem, String rtp, String stream);
}
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -147,9 +147,11 @@
            if (streamId == null) {
                streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
            }
            int rtpServerPort = mediaServerItem.getRtpProxyPort();
            int rtpServerPort;
            if (mediaServerItem.isRtpEnable()) {
                rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port);
            } else {
                rtpServerPort = mediaServerItem.getRtpProxyPort();
            }
            RedisUtil.set(key, mediaServerItem);
            return new SSRCInfo(rtpServerPort, ssrc, streamId);
@@ -681,4 +683,13 @@
            }
        }
    }
    @Override
    public boolean checkRtpServer(MediaServerItem mediaServerItem, String app, String stream) {
        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, stream);
        if(rtpInfo.getInteger("code") == 0){
            return rtpInfo.getBoolean("exist");
        }
        return false;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -164,17 +164,30 @@
            JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
            if(rtpInfo.getInteger("code") == 0){
                if (rtpInfo.getBoolean("exist")) {
                    int localPort = rtpInfo.getInteger("local_port");
                    if (localPort == 0) {
                        logger.warn("[点播],点播时发现rtpServerC存在,但是尚未开始推流");
                        // 此时说明rtpServer已经创建但是流还没有推上来
                        WVPResult wvpResult = new WVPResult();
                        wvpResult.setCode(ErrorCode.ERROR100.getCode());
                        wvpResult.setMsg("点播已经在进行中,请稍候重试");
                        msg.setData(wvpResult);
                    WVPResult wvpResult = new WVPResult();
                    wvpResult.setCode(ErrorCode.SUCCESS.getCode());
                    wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
                    wvpResult.setData(streamInfo);
                    msg.setData(wvpResult);
                        resultHolder.invokeAllResult(msg);
                        return playResult;
                    }else {
                        WVPResult wvpResult = new WVPResult();
                        wvpResult.setCode(ErrorCode.SUCCESS.getCode());
                        wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
                        wvpResult.setData(streamInfo);
                        msg.setData(wvpResult);
                    resultHolder.invokeAllResult(msg);
                    if (hookEvent != null) {
                        hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
                        resultHolder.invokeAllResult(msg);
                        if (hookEvent != null) {
                            hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
                        }
                    }
                }else {
                    redisCatchStorage.stopPlay(streamInfo);
                    storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
@@ -187,7 +200,6 @@
                streamInfo = null;
            }
        }
        if (streamInfo == null) {
            String streamId = null;
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -143,15 +143,12 @@
    @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId}"})
    void offlineByDeviceId(String deviceId);
    @Update(value = {"UPDATE device_channel SET status=1 WHERE deviceId=#{deviceId} AND channelId=#{channelId}"})
    void online(String deviceId,  String channelId);
    @Insert("<script> " +
            "insert into device_channel " +
            "(channelId, deviceId, name, manufacture, model, owner, civilCode, block, subCount, " +
            "  address, parental, parentId, safetyWay, registerWay, certNum, certifiable, errCode, secrecy, " +
            "  ipAddress, port, password, PTZType, status, streamId, longitude, latitude, longitudeGcj02, latitudeGcj02, " +
            "  longitudeWgs84, latitudeWgs84, createTime, updateTime, businessGroupId, gpsTime) " +
            "  longitudeWgs84, latitudeWgs84, hasAudio, createTime, updateTime, businessGroupId, gpsTime) " +
            "values " +
            "<foreach collection='addChannels' index='index' item='item' separator=','> " +
            "('${item.channelId}', '${item.deviceId}', '${item.name}', '${item.manufacture}', '${item.model}', " +
@@ -160,7 +157,7 @@
            "'${item.certNum}', ${item.certifiable}, ${item.errCode}, '${item.secrecy}', " +
            "'${item.ipAddress}', ${item.port}, '${item.password}', ${item.PTZType}, ${item.status}, " +
            "'${item.streamId}', ${item.longitude}, ${item.latitude},${item.longitudeGcj02}, " +
            "${item.latitudeGcj02},${item.longitudeWgs84}, ${item.latitudeWgs84},'${item.createTime}', '${item.updateTime}', " +
            "${item.latitudeGcj02},${item.longitudeWgs84}, ${item.latitudeWgs84}, ${item.hasAudio},'${item.createTime}', '${item.updateTime}', " +
            "'${item.businessGroupId}', '${item.gpsTime}') " +
            "</foreach> " +
            "ON DUPLICATE KEY UPDATE " +
@@ -193,10 +190,14 @@
            "latitudeGcj02=VALUES(latitudeGcj02), " +
            "longitudeWgs84=VALUES(longitudeWgs84), " +
            "latitudeWgs84=VALUES(latitudeWgs84), " +
            "hasAudio=VALUES(hasAudio), " +
            "businessGroupId=VALUES(businessGroupId), " +
            "gpsTime=VALUES(gpsTime)" +
            "</script>")
    int batchAdd(List<DeviceChannel> addChannels);
    @Update(value = {"UPDATE device_channel SET status=1 WHERE deviceId=#{deviceId} AND channelId=#{channelId}"})
    void online(String deviceId,  String channelId);
    @Update({"<script>" +
            "<foreach collection='updateChannels' item='item' separator=';'>" +
@@ -341,4 +342,7 @@
            " left join platform_catalog pc on pgc.catalogId = pc.id and pgc.platformId = pc.platformId" +
            " where pgc.platformId=#{serverGBId}")
    List<DeviceChannel> queryChannelWithCatalog(String serverGBId);
    @Select("select * from device_channel where deviceId = #{deviceId}")
    List<DeviceChannel> queryAllChannels(String deviceId);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -111,11 +111,11 @@
        if (CollectionUtils.isEmpty(deviceChannelList)) {
            return false;
        }
        List<DeviceChannel> allChannelInPlay = deviceChannelMapper.getAllChannelInPlay();
        Map<String,DeviceChannel> allChannelMapInPlay = new ConcurrentHashMap<>();
        if (allChannelInPlay.size() > 0) {
            for (DeviceChannel deviceChannel : allChannelInPlay) {
                allChannelMapInPlay.put(deviceChannel.getChannelId(), deviceChannel);
        List<DeviceChannel> allChannels = deviceChannelMapper.queryAllChannels(deviceId);
        Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
        if (allChannels.size() > 0) {
            for (DeviceChannel deviceChannel : allChannels) {
                allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
            }
        }
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
@@ -123,15 +123,17 @@
        List<DeviceChannel> channels = new ArrayList<>();
        StringBuilder stringBuilder = new StringBuilder();
        Map<String, Integer> subContMap = new HashMap<>();
        if (deviceChannelList.size() > 1) {
        if (deviceChannelList.size() > 0) {
            // 数据去重
            Set<String> gbIdSet = new HashSet<>();
            for (DeviceChannel deviceChannel : deviceChannelList) {
                if (!gbIdSet.contains(deviceChannel.getChannelId())) {
                    gbIdSet.add(deviceChannel.getChannelId());
                    if (allChannelMapInPlay.containsKey(deviceChannel.getChannelId())) {
                        deviceChannel.setStreamId(allChannelMapInPlay.get(deviceChannel.getChannelId()).getStreamId());
                    if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
                        deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
                        deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
                    }
                    channels.add(deviceChannel);
                    if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
                        if (subContMap.get(deviceChannel.getParentId()) == null) {
@@ -153,8 +155,6 @@
                }
            }
        }else {
            channels = deviceChannelList;
        }
        if (stringBuilder.length() > 0) {
            logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
src/main/java/com/genersoft/iot/vmp/vmanager/user/UserController.java
@@ -43,6 +43,7 @@
    private IRoleService roleService;
    @GetMapping("/login")
    @PostMapping("/login")
    @Operation(summary = "登录")
    @Parameter(name = "username", description = "用户名", required = true)
    @Parameter(name = "password", description = "密码(32位md5加密)", required = true)
src/main/resources/logback-spring-local.xml
@@ -98,6 +98,11 @@
        <appender-ref ref="STDOUT" />
    </root>
    <logger name="wvp" level="debug" additivity="true">
        <appender-ref ref="RollingFileError"/>
        <appender-ref ref="RollingFile"/>
    </logger>
    <logger name="GB28181_SIP" level="debug" additivity="true">
        <appender-ref ref="RollingFileError"/>
        <appender-ref ref="sipRollingFile"/>