修复转发国标notify-update时信息错误的问题
| | |
| | | ParentPlatform parentPlatform = null; |
| | | |
| | | Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>(); |
| | | Map<String, DeviceChannel> channelMap = new HashMap<>(); |
| | | if (!ObjectUtils.isEmpty(event.getPlatformId())) { |
| | | subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); |
| | | if (subscribe == null) { |
| | |
| | | for (DeviceChannel deviceChannel : event.getDeviceChannels()) { |
| | | List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms); |
| | | parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB); |
| | | channelMap.put(deviceChannel.getChannelId(), deviceChannel); |
| | | } |
| | | } |
| | | }else if (event.getGbStreams() != null) { |
| | |
| | | } |
| | | logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); |
| | | List<DeviceChannel> deviceChannelList = new ArrayList<>(); |
| | | DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId); |
| | | DeviceChannel deviceChannel = channelMap.get(gbId); |
| | | deviceChannelList.add(deviceChannel); |
| | | GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId); |
| | | if(gbStream != null){ |
| | |
| | | Integer finalIndex = index; |
| | | String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels, |
| | | deviceChannels.size(), type, subscribeInfo); |
| | | System.out.println(catalogXmlContent); |
| | | logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size()); |
| | | sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { |
| | | logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); |
| | |
| | | |
| | | private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) { |
| | | StringBuffer catalogXml = new StringBuffer(600); |
| | | |
| | | String characterSet = parentPlatform.getCharacterSet(); |
| | | catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n") |
| | | .append("<Notify>\r\n") |
| | |
| | | import com.google.common.primitives.Bytes; |
| | | import gov.nist.javax.sip.message.SIPRequest; |
| | | import gov.nist.javax.sip.message.SIPResponse; |
| | | import org.apache.commons.lang3.ArrayUtils; |
| | | import org.dom4j.Document; |
| | | import org.dom4j.DocumentException; |
| | | import org.dom4j.Element; |
| | |
| | | return getRootElement(evt, "gb2312"); |
| | | } |
| | | public Element getRootElement(RequestEvent evt, String charset) throws DocumentException { |
| | | |
| | | if (charset == null) { |
| | | charset = "gb2312"; |
| | | } |
| | |
| | | |
| | | import javax.sip.RequestEvent; |
| | | import javax.sip.header.FromHeader; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | |
| | | |
| | | private final static String talkKey = "notify-request-for-mobile-position-task"; |
| | | |
| | | // @Async("taskExecutor") |
| | | @Async("taskExecutor") |
| | | public void process(RequestEvent evt) { |
| | | try { |
| | | FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); |
| | |
| | | deviceChannel.setGpsTime(mobilePosition.getTime()); |
| | | updateChannelMap.put(deviceId + channelId, deviceChannel); |
| | | addMobilePositionList.add(mobilePosition); |
| | | if(updateChannelMap.size() > 100) { |
| | | if(updateChannelMap.size() > 2000) { |
| | | executeSaveChannel(); |
| | | } |
| | | if (userSetting.isSavePositionHistory()) { |
| | | if(addMobilePositionList.size() > 100) { |
| | | if(addMobilePositionList.size() > 2000) { |
| | | executeSaveMobilePosition(); |
| | | } |
| | | } |
| | |
| | | dynamicTask.execute(); |
| | | try { |
| | | logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); |
| | | // ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); |
| | | // deviceChannelService.batchUpdateChannelGPS(deviceChannels); |
| | | ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); |
| | | deviceChannelService.batchUpdateChannelGPS(deviceChannels); |
| | | updateChannelMap.clear(); |
| | | }catch (Exception e) { |
| | | |
| | |
| | | public void executeSaveMobilePosition(){ |
| | | if (userSetting.isSavePositionHistory()) { |
| | | try { |
| | | // logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); |
| | | // deviceChannelService.batchAddMobilePosition(addMobilePositionList); |
| | | logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); |
| | | deviceChannelService.batchAddMobilePosition(addMobilePositionList); |
| | | addMobilePositionList.clear(); |
| | | }catch (Exception e) { |
| | | logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); |
| | |
| | | } else if (CmdType.MOBILE_POSITION.equals(cmd)) { |
| | | // logger.info("接收到MobilePosition通知"); |
| | | // processNotifyMobilePosition(take.getEvt()); |
| | | taskExecutor.execute(() -> { |
| | | // taskExecutor.execute(() -> { |
| | | notifyRequestForMobilePositionProcessor.process(take.getEvt()); |
| | | }); |
| | | // }); |
| | | |
| | | } else { |
| | | logger.info("接收到消息:" + cmd); |
| | |
| | | } else { |
| | | mobilePosition.setAltitude(0.0); |
| | | } |
| | | logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), |
| | | mobilePosition.getLongitude(), mobilePosition.getLatitude()); |
| | | // logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), |
| | | // mobilePosition.getLongitude(), mobilePosition.getLatitude()); |
| | | mobilePosition.setReportSource("Mobile Position"); |
| | | |
| | | // 更新device channel 的经纬度 |
| | |
| | | this.redisCatchStorage = redisCatchStorage; |
| | | } |
| | | |
| | | @Scheduled(fixedRate = 1000) //每1秒执行一次 |
| | | @Scheduled(fixedRate = 10000) //每1秒执行一次 |
| | | public void execute(){ |
| | | System.out.println("待处理消息数量: " + taskQueue.size()); |
| | | logger.info("[待处理Notify消息数量]: {}", taskQueue.size()); |
| | | } |
| | | } |
| | |
| | | logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId); |
| | | throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); |
| | | } |
| | | |
| | | Device device = redisCatchStorage.getDevice(deviceId); |
| | | if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) { |
| | | logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); |
| | |
| | | InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); |
| | | if (inviteInfo != null ) { |
| | | if (inviteInfo.getStreamInfo() == null) { |
| | | // 释放生成的ssrc,使用上一次申请的 |
| | | ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); |
| | | // 点播发起了但是尚未成功, 仅注册回调等待结果即可 |
| | | inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback); |
| | | logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId); |