648540858
2024-04-23 901dee2bf4c91fa92306b5d8aa66b3148658186c
修复转发国标notify-update时信息错误的问题
6个文件已修改
38 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -49,6 +49,7 @@
        ParentPlatform parentPlatform = null;
        Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
        Map<String, DeviceChannel> channelMap = new HashMap<>();
        if (!ObjectUtils.isEmpty(event.getPlatformId())) {
            subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
            if (subscribe == null) {
@@ -67,6 +68,7 @@
                    for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
                        List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms);
                        parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB);
                        channelMap.put(deviceChannel.getChannelId(), deviceChannel);
                    }
                }
            }else if (event.getGbStreams() != null) {
@@ -174,7 +176,7 @@
                                }
                                logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
                                List<DeviceChannel> deviceChannelList = new ArrayList<>();
                                DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
                                DeviceChannel deviceChannel = channelMap.get(gbId);
                                deviceChannelList.add(deviceChannel);
                                GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
                                if(gbStream != null){
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -597,6 +597,7 @@
        Integer finalIndex = index;
        String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
                deviceChannels.size(), type, subscribeInfo);
        System.out.println(catalogXmlContent);
        logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size());
        sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
            logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
@@ -626,7 +627,6 @@
    private  String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
        StringBuffer catalogXml = new StringBuffer(600);
        String characterSet = parentPlatform.getCharacterSet();
        catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n")
                .append("<Notify>\r\n")
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -6,7 +6,6 @@
import com.google.common.primitives.Bytes;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.apache.commons.lang3.ArrayUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -172,6 +171,7 @@
        return getRootElement(evt, "gb2312");
    }
    public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
        if (charset == null) {
            charset = "gb2312";
        }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -25,6 +25,7 @@
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -67,7 +68,7 @@
    private final static String talkKey = "notify-request-for-mobile-position-task";
//    @Async("taskExecutor")
    @Async("taskExecutor")
    public void process(RequestEvent evt) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
@@ -172,11 +173,11 @@
            deviceChannel.setGpsTime(mobilePosition.getTime());
            updateChannelMap.put(deviceId + channelId, deviceChannel);
            addMobilePositionList.add(mobilePosition);
            if(updateChannelMap.size() > 100) {
            if(updateChannelMap.size() > 2000) {
                executeSaveChannel();
            }
            if (userSetting.isSavePositionHistory()) {
                if(addMobilePositionList.size() > 100) {
                if(addMobilePositionList.size() > 2000) {
                    executeSaveMobilePosition();
                }
            }
@@ -212,8 +213,8 @@
        dynamicTask.execute();
        try {
            logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
//            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
//            deviceChannelService.batchUpdateChannelGPS(deviceChannels);
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
            deviceChannelService.batchUpdateChannelGPS(deviceChannels);
            updateChannelMap.clear();
        }catch (Exception e) {
@@ -223,8 +224,8 @@
    public void executeSaveMobilePosition(){
        if (userSetting.isSavePositionHistory()) {
            try {
//                logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
//                deviceChannelService.batchAddMobilePosition(addMobilePositionList);
                logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
                deviceChannelService.batchAddMobilePosition(addMobilePositionList);
                addMobilePositionList.clear();
            }catch (Exception e) {
                logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -136,9 +136,9 @@
                        } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
//                            logger.info("接收到MobilePosition通知");
//                            processNotifyMobilePosition(take.getEvt());
                            taskExecutor.execute(() -> {
//                            taskExecutor.execute(() -> {
                                notifyRequestForMobilePositionProcessor.process(take.getEvt());
                            });
//                            });
                        } else {
                            logger.info("接收到消息:" + cmd);
@@ -226,8 +226,8 @@
            } else {
                mobilePosition.setAltitude(0.0);
            }
            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
//            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
            mobilePosition.setReportSource("Mobile Position");
            // 更新device channel 的经纬度
@@ -370,8 +370,8 @@
        this.redisCatchStorage = redisCatchStorage;
    }
    @Scheduled(fixedRate = 1000)   //每1秒执行一次
    @Scheduled(fixedRate = 10000)   //每1秒执行一次
    public void execute(){
        System.out.println("待处理消息数量: " + taskQueue.size());
        logger.info("[待处理Notify消息数量]: {}", taskQueue.size());
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -149,7 +149,6 @@
            logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
        }
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
            logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
@@ -163,6 +162,8 @@
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
        if (inviteInfo != null ) {
            if (inviteInfo.getStreamInfo() == null) {
                // 释放生成的ssrc,使用上一次申请的
                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                // 点播发起了但是尚未成功, 仅注册回调等待结果即可
                inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);