648540858
2024-04-24 2113e8cf271e0d189d4ff9dd2d4d5dd7cba6e3ab
优化notify消息中目录的处理
4个文件已修改
368 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 289 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -5,6 +5,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -18,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -28,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -45,6 +48,8 @@
    private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
    private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private UserSetting userSetting;
@@ -65,11 +70,24 @@
    private SipConfig sipConfig;
    @Transactional
    public void process(List<RequestEvent> evtList) {
        if (evtList.isEmpty()) {
    public void process(RequestEvent evt) {
        if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
            return;
        }
        for (RequestEvent evt : evtList) {
        taskQueue.offer(new HandlerCatchData(evt, null, null));
    }
    @Scheduled(fixedRate = 400)   //每400毫秒执行一次
    public void executeTaskQueue(){
        if (taskQueue.isEmpty()) {
            return;
        }
        for (HandlerCatchData take : taskQueue) {
            if (take == null) {
                continue;
            }
            RequestEvent evt = take.getEvt();
            try {
                long start = System.currentTimeMillis();
                FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
@@ -221,6 +239,7 @@
                logger.error("未处理的异常 ", e);
            }
        }
        taskQueue.clear();
        if (!updateChannelMap.keySet().isEmpty()
                || !addChannelMap.keySet().isEmpty()
                || !updateChannelOnlineList.isEmpty()
@@ -295,4 +314,9 @@
            updateChannelOfflineList.clear();
        }
    }
    @Scheduled(fixedRate = 10000)   //每1秒执行一次
    public void execute(){
        logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size());
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -4,6 +4,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -17,6 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
@@ -27,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * SIP命令类型: NOTIFY请求中的移动位置请求处理
@@ -37,6 +40,7 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private UserSetting userSetting;
@@ -50,14 +54,28 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    public void process(RequestEvent evt) {
        if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            logger.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
            return;
        }
        taskQueue.offer(new HandlerCatchData(evt, null, null));
    }
    @Scheduled(fixedRate = 200) //每200毫秒执行一次
    @Transactional
    public void process(List<RequestEvent> eventList) {
        if (eventList.isEmpty()) {
    public void executeTaskQueue() {
        if (taskQueue.isEmpty()) {
            return;
        }
        Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
        List<MobilePosition> addMobilePositionList = new ArrayList<>();
        for (RequestEvent evt : eventList) {
        for (HandlerCatchData take : taskQueue) {
            if (take == null) {
                continue;
            }
            RequestEvent evt = take.getEvt();
            try {
                FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@@ -180,10 +198,11 @@
                logger.error("未处理的异常 ", e);
            }
        }
        taskQueue.clear();
        if(!updateChannelMap.isEmpty()) {
            List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
            logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
            deviceChannelService.batchUpdateChannelGPS(channels);
            deviceChannelService.batchUpdateChannel(channels);
            updateChannelMap.clear();
        }
        if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
@@ -196,4 +215,8 @@
            addMobilePositionList.clear();
        }
    }
    @Scheduled(fixedRate = 10000)
    public void execute(){
        logger.info("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size());
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -1,12 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
@@ -14,9 +11,7 @@
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -24,9 +19,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
@@ -35,9 +27,6 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * SIP命令类型: NOTIFY请求,这是作为上级发送订阅请求后,设备才会响应的
@@ -47,15 +36,6 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class);
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IVideoManagerStorage storager;
    @Autowired
    private EventPublisher eventPublisher;
    @Autowired
    private SipConfig sipConfig;
@@ -80,14 +60,6 @@
    @Autowired
    private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    private int maxQueueCount = 30000;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -97,187 +69,122 @@
    @Override
    public void process(RequestEvent evt) {
        try {
            if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
                responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
                logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
                return;
            } else {
            responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
                logger.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest());
                responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
                return;
            }
            String cmd = XmlUtil.getText(rootElement, "CmdType");
            if (CmdType.CATALOG.equals(cmd)) {
                notifyRequestForCatalogProcessor.process(evt);
            } else if (CmdType.ALARM.equals(cmd)) {
                processNotifyAlarm(evt);
            } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                notifyRequestForMobilePositionProcessor.process(evt);
            } else {
                logger.info("接收到消息:" + cmd);
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("未处理的异常 ", e);
        }
        taskQueue.offer(new HandlerCatchData(evt, null, null));
    }
    @Scheduled(fixedRate = 200)   //每200毫秒执行一次
    public void executeTaskQueue(){
        if (taskQueue.isEmpty()) {
            return;
        }
        try {
            List<RequestEvent> catalogEventList = new ArrayList<>();
            List<RequestEvent> alarmEventList = new ArrayList<>();
            List<RequestEvent> mobilePositionEventList = new ArrayList<>();
            for (HandlerCatchData take : taskQueue) {
                if (take == null) {
                    continue;
                }
                Element rootElement = getRootElement(take.getEvt());
                if (rootElement == null) {
                    logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                    continue;
                }
                String cmd = XmlUtil.getText(rootElement, "CmdType");
                if (CmdType.CATALOG.equals(cmd)) {
                    catalogEventList.add(take.getEvt());
                } else if (CmdType.ALARM.equals(cmd)) {
                    alarmEventList.add(take.getEvt());
                } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                    mobilePositionEventList.add(take.getEvt());
                } else {
                    logger.info("接收到消息:" + cmd);
                }
            }
            taskQueue.clear();
            if (!alarmEventList.isEmpty()) {
                processNotifyAlarm(alarmEventList);
            }
            if (!catalogEventList.isEmpty()) {
                notifyRequestForCatalogProcessor.process(catalogEventList);
            }
            if (!mobilePositionEventList.isEmpty()) {
                notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
            }
        } catch (DocumentException e) {
            logger.error("处理NOTIFY消息时错误", e);
            throw new RuntimeException(e);
        }
    }
    }
    /***
     * 处理alarm设备报警Notify
     */
    private void processNotifyAlarm(List<RequestEvent> evtList) {
    private void processNotifyAlarm(RequestEvent evt) {
        if (!sipConfig.isAlarm()) {
            return;
        }
        if (!evtList.isEmpty()) {
            for (RequestEvent evt : evtList) {
                try {
                    FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                    String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
                    Element rootElement = getRootElement(evt);
                    if (rootElement == null) {
                        logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
                        return;
                    }
                    Element deviceIdElement = rootElement.element("DeviceID");
                    String channelId = deviceIdElement.getText().toString();
                    Device device = redisCatchStorage.getDevice(deviceId);
                    if (device == null) {
                        logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId);
                        return;
                    }
                    rootElement = getRootElement(evt, device.getCharset());
                    if (rootElement == null) {
                        logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
                        return;
                    }
                    DeviceAlarm deviceAlarm = new DeviceAlarm();
                    deviceAlarm.setDeviceId(deviceId);
                    deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
                    deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
                    String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
                    if (alarmTime == null) {
                        logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
                        return;
                    }
                    deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
                    if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
                        deviceAlarm.setAlarmDescription("");
                    } else {
                        deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
                    }
                    if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
                        deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
                    } else {
                        deviceAlarm.setLongitude(0.00);
                    }
                    if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
                        deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
                    } else {
                        deviceAlarm.setLatitude(0.00);
                    }
                    logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
                    if ("4".equals(deviceAlarm.getAlarmMethod())) {
                        MobilePosition mobilePosition = new MobilePosition();
                        mobilePosition.setChannelId(channelId);
                        mobilePosition.setCreateTime(DateUtil.getNow());
                        mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
                        mobilePosition.setTime(deviceAlarm.getAlarmTime());
                        mobilePosition.setLongitude(deviceAlarm.getLongitude());
                        mobilePosition.setLatitude(deviceAlarm.getLatitude());
                        mobilePosition.setReportSource("GPS Alarm");
                        // 更新device channel 的经纬度
                        DeviceChannel deviceChannel = new DeviceChannel();
                        deviceChannel.setDeviceId(device.getDeviceId());
                        deviceChannel.setChannelId(channelId);
                        deviceChannel.setLongitude(mobilePosition.getLongitude());
                        deviceChannel.setLatitude(mobilePosition.getLatitude());
                        deviceChannel.setGpsTime(mobilePosition.getTime());
                        deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
                        mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
                        mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
                        mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
                        mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
                        deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
                    }
                    // 回复200 OK
                    if (redisCatchStorage.deviceIsOnline(deviceId)) {
                        publisher.deviceAlarmEventPublish(deviceAlarm);
                    }
                } catch (DocumentException e) {
                    logger.error("未处理的异常 ", e);
                }
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
                logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
                return;
            }
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getText().toString();
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
                logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId);
                return;
            }
            rootElement = getRootElement(evt, device.getCharset());
            if (rootElement == null) {
                logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
                return;
            }
            DeviceAlarm deviceAlarm = new DeviceAlarm();
            deviceAlarm.setDeviceId(deviceId);
            deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
            deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
            String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
            if (alarmTime == null) {
                logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
                return;
            }
            deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
            if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
                deviceAlarm.setAlarmDescription("");
            } else {
                deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
                deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
            } else {
                deviceAlarm.setLongitude(0.00);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
                deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
            } else {
                deviceAlarm.setLatitude(0.00);
            }
            logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
            if ("4".equals(deviceAlarm.getAlarmMethod())) {
                MobilePosition mobilePosition = new MobilePosition();
                mobilePosition.setChannelId(channelId);
                mobilePosition.setCreateTime(DateUtil.getNow());
                mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
                mobilePosition.setTime(deviceAlarm.getAlarmTime());
                mobilePosition.setLongitude(deviceAlarm.getLongitude());
                mobilePosition.setLatitude(deviceAlarm.getLatitude());
                mobilePosition.setReportSource("GPS Alarm");
                // 更新device channel 的经纬度
                DeviceChannel deviceChannel = new DeviceChannel();
                deviceChannel.setDeviceId(device.getDeviceId());
                deviceChannel.setChannelId(channelId);
                deviceChannel.setLongitude(mobilePosition.getLongitude());
                deviceChannel.setLatitude(mobilePosition.getLatitude());
                deviceChannel.setGpsTime(mobilePosition.getTime());
                deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
                mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
                mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
                mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
                mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
                deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
            }
            // 回复200 OK
            if (redisCatchStorage.deviceIsOnline(deviceId)) {
                publisher.deviceAlarmEventPublish(deviceAlarm);
            }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
        }
    }
    public void setCmder(SIPCommander cmder) {
    }
    public void setStorager(IVideoManagerStorage storager) {
        this.storager = storager;
    }
    public void setPublisher(EventPublisher publisher) {
        this.publisher = publisher;
    }
    public void setRedis(RedisUtil redis) {
    }
    public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
    }
    public IRedisCatchStorage getRedisCatchStorage() {
        return redisCatchStorage;
    }
    public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
        this.redisCatchStorage = redisCatchStorage;
    }
    @Scheduled(fixedRate = 10000)   //每1秒执行一次
    public void execute(){
        logger.info("[待处理Notify消息数量]: {}", taskQueue.size());
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -275,15 +275,23 @@
    }
    @Override
    public void batchUpdateChannel(List<DeviceChannel> channels) {
    public synchronized void batchUpdateChannel(List<DeviceChannel> channels) {
        String now = DateUtil.getNow();
        for (DeviceChannel channel : channels) {
            channel.setUpdateTime(now);
        }
        channelMapper.batchUpdate(channels);
        for (DeviceChannel channel : channels) {
            if (channel.getParentId() != null) {
                channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId());
        int limitCount = 1000;
        if (!channels.isEmpty()) {
            if (channels.size() > limitCount) {
                for (int i = 0; i < channels.size(); i += limitCount) {
                    int toIndex = i + limitCount;
                    if (i + limitCount > channels.size()) {
                        toIndex = channels.size();
                    }
                    channelMapper.batchUpdate(channels.subList(i, toIndex));
                }
            }else {
                channelMapper.batchUpdate(channels);
            }
        }
    }