648540858
2024-03-28 f1e902af7fd88138c76abcf1a17d04efb8d3293a
支持级联移动位置订阅通知转发
10个文件已修改
2个文件已添加
303 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -103,6 +103,16 @@
        return platforms;
    }
    public List<String> getAllMobilePositionSubscribePlatform() {
        List<String> platforms = new ArrayList<>();
        if(!mobilePositionMap.isEmpty()) {
            for (String key : mobilePositionMap.keySet()) {
                platforms.add(mobilePositionMap.get(key).getId());
            }
        }
        return platforms;
    }
    public void removeAllSubscribe(String platformId) {
        removeMobilePositionSubscribe(platformId);
        removeCatalogSubscribe(platformId);
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
@@ -4,6 +4,7 @@
import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent;
import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent;
import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent;
import org.springframework.beans.factory.annotation.Autowired;
@@ -94,6 +95,13 @@
    }
    public void mobilePositionEventPublish(MobilePosition mobilePosition) {
        MobilePositionEvent event = new MobilePositionEvent(this);
        event.setMobilePosition(mobilePosition);
        applicationEventPublisher.publishEvent(event);
    }
    public void catalogEventPublishForStream(String platformId, List<GbStream> gbStreams, String type) {
        CatalogEvent outEvent = new CatalogEvent(this);
        outEvent.setGbStreams(gbStreams);
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java
New file
@@ -0,0 +1,20 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import org.springframework.context.ApplicationEvent;
public class MobilePositionEvent extends ApplicationEvent {
    public MobilePositionEvent(Object source) {
        super(source);
    }
    private MobilePosition mobilePosition;
    public MobilePosition getMobilePosition() {
        return mobilePosition;
    }
    public void setMobilePosition(MobilePosition mobilePosition) {
        this.mobilePosition = mobilePosition;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java
New file
@@ -0,0 +1,61 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
/**
 * 移动位置通知消息转发
 */
@Component
public class MobilePositionEventLister implements ApplicationListener<MobilePositionEvent> {
    private final static Logger logger = LoggerFactory.getLogger(MobilePositionEventLister.class);
    @Autowired
    private IVideoManagerStorage storager;
    @Autowired
    private SIPCommanderFroPlatform sipCommanderFroPlatform;
    @Autowired
    private SubscribeHolder subscribeHolder;
    @Override
    public void onApplicationEvent(MobilePositionEvent event) {
        // 获取所用订阅
        List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform();
        if (platforms.isEmpty()) {
            return;
        }
        List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(event.getMobilePosition().getChannelId(), platforms);
        for (ParentPlatform platform : parentPlatformsForGB) {
            logger.info("[向上级发送MobilePosition] 通道:{},平台:{}, 位置: {}:{}", event.getMobilePosition().getChannelId(),
                    platform.getServerGBId(), event.getMobilePosition().getLongitude(), event.getMobilePosition().getLatitude());
            SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
            try {
                sipCommanderFroPlatform.sendNotifyMobilePosition(platform, GPSMsgInfo.getInstance(event.getMobilePosition()),
                        subscribe);
            } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
                     IllegalAccessException e) {
                logger.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -1,7 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
@@ -78,9 +76,6 @@
    @Autowired
    private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
    @Autowired
    private CivilCodeFileConf civilCodeFileConf;
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -98,7 +93,6 @@
    @Override
    public void process(RequestEvent evt) {
        try {
            if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
                responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
                logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
@@ -234,25 +228,8 @@
            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
            if (userSetting.getSavePositionHistory()) {
                storager.insertMobilePosition(mobilePosition);
            }
            deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
            storager.updateChannelPosition(deviceChannel);
            // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
            // 发送redis消息。 通知位置信息的变化
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
            jsonObject.put("serial", deviceId);
            jsonObject.put("code", channelId);
            jsonObject.put("longitude", mobilePosition.getLongitude());
            jsonObject.put("latitude", mobilePosition.getLatitude());
            jsonObject.put("altitude", mobilePosition.getAltitude());
            jsonObject.put("direction", mobilePosition.getDirection());
            jsonObject.put("speed", mobilePosition.getSpeed());
            redisCatchStorage.sendMobilePositionMsg(jsonObject);
        } catch (DocumentException  e) {
            logger.error("未处理的异常 ", e);
        }
@@ -340,25 +317,8 @@
                mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
                mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
                if (userSetting.getSavePositionHistory()) {
                    storager.insertMobilePosition(mobilePosition);
                }
                storager.updateChannelPosition(deviceChannel);
                // 发送redis消息。 通知位置信息的变化
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                jsonObject.put("serial", deviceChannel.getDeviceId());
                jsonObject.put("code", deviceChannel.getChannelId());
                jsonObject.put("longitude", mobilePosition.getLongitude());
                jsonObject.put("latitude", mobilePosition.getLatitude());
                jsonObject.put("altitude", mobilePosition.getAltitude());
                jsonObject.put("direction", mobilePosition.getDirection());
                jsonObject.put("speed", mobilePosition.getSpeed());
                redisCatchStorage.sendMobilePositionMsg(jsonObject);
                deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
            }
            // TODO: 需要实现存储报警信息、报警分类
            // 回复200 OK
            if (redisCatchStorage.deviceIsOnline(deviceId)) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -75,6 +75,9 @@
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Autowired
    private EventPublisher eventPublisher;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -158,22 +161,7 @@
                                mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
                                mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
                                if (userSetting.getSavePositionHistory()) {
                                    storager.insertMobilePosition(mobilePosition);
                                }
                                storager.updateChannelPosition(deviceChannel);
                                // 发送redis消息。 通知位置信息的变化
                                JSONObject jsonObject = new JSONObject();
                                jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                                jsonObject.put("serial", deviceChannel.getDeviceId());
                                jsonObject.put("code", deviceChannel.getChannelId());
                                jsonObject.put("longitude", mobilePosition.getLongitude());
                                jsonObject.put("latitude", mobilePosition.getLatitude());
                                jsonObject.put("altitude", mobilePosition.getAltitude());
                                jsonObject.put("direction", mobilePosition.getDirection());
                                jsonObject.put("speed", mobilePosition.getSpeed());
                                redisCatchStorage.sendMobilePositionMsg(jsonObject);
                                deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
                            }
                        }
                        if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -1,8 +1,8 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
@@ -56,6 +56,9 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    @Autowired
    private EventPublisher eventPublisher;
    private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
@@ -137,22 +140,7 @@
                        mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
                        mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
                        if (userSetting.getSavePositionHistory()) {
                            storager.insertMobilePosition(mobilePosition);
                        }
                        storager.updateChannelPosition(deviceChannel);
                        // 发送redis消息。 通知位置信息的变化
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                        jsonObject.put("serial", deviceChannel.getDeviceId());
                        jsonObject.put("code", deviceChannel.getChannelId());
                        jsonObject.put("longitude", mobilePosition.getLongitude());
                        jsonObject.put("latitude", mobilePosition.getLatitude());
                        jsonObject.put("altitude", mobilePosition.getAltitude());
                        jsonObject.put("direction", mobilePosition.getDirection());
                        jsonObject.put("speed", mobilePosition.getSpeed());
                        redisCatchStorage.sendMobilePositionMsg(jsonObject);
                        deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
                    } catch (DocumentException e) {
                        logger.error("未处理的异常 ", e);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@@ -131,11 +130,7 @@
            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
            if (userSetting.getSavePositionHistory()) {
                storager.insertMobilePosition(mobilePosition);
            }
            storager.updateChannelPosition(deviceChannel);
            deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
            String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId();
            RequestMessage msg = new RequestMessage();
@@ -143,17 +138,6 @@
            msg.setData(mobilePosition);
            resultHolder.invokeAllResult(msg);
            // 发送redis消息。 通知位置信息的变化
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
            jsonObject.put("serial", deviceChannel.getDeviceId());
            jsonObject.put("code", deviceChannel.getChannelId());
            jsonObject.put("longitude", mobilePosition.getLongitude());
            jsonObject.put("latitude", mobilePosition.getLatitude());
            jsonObject.put("altitude", mobilePosition.getAltitude());
            jsonObject.put("direction", mobilePosition.getDirection());
            jsonObject.put("speed", mobilePosition.getSpeed());
            redisCatchStorage.sendMobilePositionMsg(jsonObject);
            //回复 200 OK
            try {
                responseAck(request, Response.OK);
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@@ -92,4 +93,9 @@
     * 修改通道的码流类型
     */
    void updateChannelStreamIdentification(DeviceChannel channel);
    List<DeviceChannel> queryChaneListByDeviceId(String deviceId);
    void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
}
src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
@@ -1,5 +1,8 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.utils.DateUtil;
public class GPSMsgInfo {
    /**
@@ -39,6 +42,18 @@
    private boolean stored;
    public static GPSMsgInfo getInstance(MobilePosition mobilePosition) {
        GPSMsgInfo gpsMsgInfo = new GPSMsgInfo();
        gpsMsgInfo.setId(mobilePosition.getChannelId());
        gpsMsgInfo.setAltitude(mobilePosition.getAltitude() + "");
        gpsMsgInfo.setLng(mobilePosition.getLongitude());
        gpsMsgInfo.setLat(mobilePosition.getLatitude());
        gpsMsgInfo.setSpeed(mobilePosition.getSpeed());
        gpsMsgInfo.setDirection(mobilePosition.getDirection() + "");
        gpsMsgInfo.setTime(DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
        return gpsMsgInfo;
    }
    public String getId() {
        return id;
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -1,16 +1,21 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@@ -35,7 +40,7 @@
    private final static Logger logger = LoggerFactory.getLogger(DeviceChannelServiceImpl.class);
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    private EventPublisher eventPublisher;
    @Autowired
    private IInviteStreamService inviteStreamService;
@@ -45,6 +50,15 @@
    @Autowired
    private DeviceMapper deviceMapper;
    @Autowired
    private DeviceMobilePositionMapper deviceMobilePositionMapper;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Override
    public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) {
@@ -84,7 +98,6 @@
    public void updateChannel(String deviceId, DeviceChannel channel) {
        String channelId = channel.getChannelId();
        channel.setDeviceId(deviceId);
//        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
        if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
            channel.setStreamId(inviteInfo.getStreamInfo().getStream());
@@ -280,4 +293,64 @@
        }
        channelMapper.updateChannelStreamIdentification(channel);
    }
    @Override
    public List<DeviceChannel> queryChaneListByDeviceId(String deviceId) {
        return channelMapper.queryAllChannels(deviceId);
    }
    @Override
    public void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition) {
        if (userSetting.getSavePositionHistory()) {
            deviceMobilePositionMapper.insertNewPosition(mobilePosition);
        }
        if (deviceChannel.getChannelId().equals(deviceChannel.getDeviceId())) {
            deviceChannel.setChannelId(null);
        }
        if (deviceChannel.getGpsTime() == null) {
            deviceChannel.setGpsTime(DateUtil.getNow());
        }
        int updated = channelMapper.updatePosition(deviceChannel);
        if (updated == 0) {
            return;
        }
        List<DeviceChannel> deviceChannels = new ArrayList<>();
        if (deviceChannel.getChannelId() == null) {
            // 有的设备这里上报的deviceId与通道Id是一样,这种情况更新设备下的全部通道
            List<DeviceChannel> deviceChannelsInDb = queryChaneListByDeviceId(device.getDeviceId());
            deviceChannels.addAll(deviceChannelsInDb);
        }else {
            deviceChannels.add(deviceChannel);
        }
        if (deviceChannels.isEmpty()) {
            return;
        }
        if (deviceChannels.size() > 100) {
            logger.warn("[更新通道位置信息后发送通知] 设备可能是平台,上报的位置信息未标明通道编号," +
                    "导致所有通道被更新位置, deviceId:{}", device.getDeviceId());
        }
        for (DeviceChannel channel : deviceChannels) {
            // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
            mobilePosition.setChannelId(channel.getChannelId());
            try {
                eventPublisher.mobilePositionEventPublish(mobilePosition);
            }catch (Exception e) {
                logger.error("[向上级转发移动位置失败] ", e);
            }
            // 发送redis消息。 通知位置信息的变化
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
            jsonObject.put("serial", mobilePosition.getDeviceId());
            jsonObject.put("code", mobilePosition.getChannelId());
            jsonObject.put("longitude", mobilePosition.getLongitude());
            jsonObject.put("latitude", mobilePosition.getLatitude());
            jsonObject.put("altitude", mobilePosition.getAltitude());
            jsonObject.put("direction", mobilePosition.getDirection());
            jsonObject.put("speed", mobilePosition.getSpeed());
            redisCatchStorage.sendMobilePositionMsg(jsonObject);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -395,7 +395,7 @@
            "WHERE device_id=#{deviceId} " +
            " <if test='channelId != null' >  AND channel_id=#{channelId}</if>" +
            " </script>"})
    void updatePosition(DeviceChannel deviceChannel);
    int updatePosition(DeviceChannel deviceChannel);
    @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
    List<DeviceChannel> getAllChannelInPlay();