| | |
| | | package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; |
| | | |
| | | import com.genersoft.iot.vmp.conf.DynamicTask; |
| | | import com.genersoft.iot.vmp.conf.SipConfig; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.Device; |
| | | import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; |
| | | import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; |
| | | import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| | | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| | |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.transaction.annotation.Transactional; |
| | | |
| | | import javax.sip.RequestEvent; |
| | | import javax.sip.header.FromHeader; |
| | | import java.util.ArrayList; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | |
| | | /** |
| | |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); |
| | | |
| | | private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>(); |
| | | private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>(); |
| | | private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); |
| | | |
| | | private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); |
| | | private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>(); |
| | | |
| | | private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | |
| | | private IDeviceChannelService deviceChannelService; |
| | | |
| | | @Autowired |
| | | private DynamicTask dynamicTask; |
| | | |
| | | @Autowired |
| | | private SipConfig sipConfig; |
| | | |
| | | @Transactional |
| | | public void process(List<RequestEvent> evtList) { |
| | | if (evtList.isEmpty()) { |
| | | public void process(RequestEvent evt) { |
| | | if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { |
| | | logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); |
| | | return; |
| | | } |
| | | for (RequestEvent evt : evtList) { |
| | | taskQueue.offer(new HandlerCatchData(evt, null, null)); |
| | | } |
| | | |
| | | @Scheduled(fixedRate = 400) //每400毫秒执行一次 |
| | | public void executeTaskQueue(){ |
| | | if (taskQueue.isEmpty()) { |
| | | return; |
| | | } |
| | | for (HandlerCatchData take : taskQueue) { |
| | | if (take == null) { |
| | | continue; |
| | | } |
| | | RequestEvent evt = take.getEvt(); |
| | | try { |
| | | long start = System.currentTimeMillis(); |
| | | FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); |
| | |
| | | |
| | | Device device = redisCatchStorage.getDevice(deviceId); |
| | | if (device == null || !device.isOnLine()) { |
| | | logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); |
| | | logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId() : "")); |
| | | return; |
| | | } |
| | | Element rootElement = getRootElement(evt, device.getCharset()); |
| | |
| | | Element eventElement = itemDevice.element("Event"); |
| | | String event; |
| | | if (eventElement == null) { |
| | | logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); |
| | | logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : "")); |
| | | event = CatalogEvent.ADD; |
| | | }else { |
| | | } else { |
| | | event = eventElement.getText().toUpperCase(); |
| | | } |
| | | DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); |
| | |
| | | channel.setParentId(null); |
| | | } |
| | | channel.setDeviceId(device.getDeviceId()); |
| | | logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); |
| | | logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); |
| | | switch (event) { |
| | | case CatalogEvent.ON: |
| | | // 上线 |
| | | deviceChannelService.online(channel); |
| | | logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | updateChannelOnlineList.add(channel); |
| | | if (userSetting.getDeviceStatusNotify()) { |
| | | // 发送redis消息 |
| | | redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); |
| | | } |
| | | |
| | | break; |
| | | case CatalogEvent.OFF : |
| | | case CatalogEvent.VLOST: |
| | | case CatalogEvent.DEFECT: |
| | | case CatalogEvent.OFF: |
| | | // 离线 |
| | | logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | if (userSetting.getRefuseChannelStatusChannelFormNotify()) { |
| | | logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | }else { |
| | | deviceChannelService.offline(channel); |
| | | logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | } else { |
| | | updateChannelOfflineList.add(channel); |
| | | if (userSetting.getDeviceStatusNotify()) { |
| | | // 发送redis消息 |
| | | redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); |
| | | } |
| | | } |
| | | break; |
| | | case CatalogEvent.VLOST: |
| | | // 视频丢失 |
| | | logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | if (userSetting.getRefuseChannelStatusChannelFormNotify()) { |
| | | logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | } else { |
| | | updateChannelOfflineList.add(channel); |
| | | if (userSetting.getDeviceStatusNotify()) { |
| | | // 发送redis消息 |
| | | redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); |
| | | } |
| | | } |
| | | break; |
| | | case CatalogEvent.DEFECT: |
| | | // 故障 |
| | | logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | if (userSetting.getRefuseChannelStatusChannelFormNotify()) { |
| | | logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | } else { |
| | | updateChannelOfflineList.add(channel); |
| | | if (userSetting.getDeviceStatusNotify()) { |
| | | // 发送redis消息 |
| | | redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); |
| | | } |
| | | } |
| | | break; |
| | | case CatalogEvent.ADD: |
| | | // 增加 |
| | | logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | // 判断此通道是否存在 |
| | | DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); |
| | | if (deviceChannel != null) { |
| | | logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | channel.setId(deviceChannel.getId()); |
| | | channel.setHasAudio(null); |
| | | updateChannelMap.put(channel.getChannelId(), channel); |
| | | } else { |
| | | addChannelMap.put(channel.getChannelId(), channel); |
| | | if (userSetting.getDeviceStatusNotify()) { |
| | | // 发送redis消息 |
| | | redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); |
| | | } |
| | | } |
| | | |
| | | break; |
| | | case CatalogEvent.DEL: |
| | | // 删除 |
| | | deviceChannelService.delete(channel); |
| | | logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | deleteChannelList.add(channel); |
| | | if (userSetting.getDeviceStatusNotify()) { |
| | | // 发送redis消息 |
| | | redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); |
| | | } |
| | | break; |
| | | case CatalogEvent.ADD: |
| | | case CatalogEvent.UPDATE: |
| | | // 更新 |
| | | channel.setUpdateTime(DateUtil.getNow()); |
| | | channel.setHasAudio(null); |
| | | deviceChannelService.updateChannel(deviceId,channel); |
| | | if (userSetting.getDeviceStatusNotify()) { |
| | | // 发送redis消息 |
| | | redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); |
| | | logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); |
| | | // 判断此通道是否存在 |
| | | DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); |
| | | if (deviceChannelForUpdate != null) { |
| | | channel.setId(deviceChannelForUpdate.getId()); |
| | | channel.setUpdateTime(DateUtil.getNow()); |
| | | channel.setHasAudio(null); |
| | | updateChannelMap.put(channel.getChannelId(), channel); |
| | | } else { |
| | | addChannelMap.put(channel.getChannelId(), channel); |
| | | if (userSetting.getDeviceStatusNotify()) { |
| | | // 发送redis消息 |
| | | redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); |
| | | } |
| | | } |
| | | break; |
| | | default: |
| | | logger.warn("[ NotifyCatalog ] event not found : {}", event ); |
| | | logger.warn("[ NotifyCatalog ] event not found : {}", event); |
| | | |
| | | } |
| | | // 转发变化信息 |
| | | eventPublisher.catalogEventPublish(null, channel, event); |
| | | } |
| | | } |
| | | |
| | | } catch (DocumentException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } |
| | | } |
| | | taskQueue.clear(); |
| | | if (!updateChannelMap.keySet().isEmpty() |
| | | || !addChannelMap.keySet().isEmpty() |
| | | || !updateChannelOnlineList.isEmpty() |
| | | || !updateChannelOfflineList.isEmpty() |
| | | || !deleteChannelList.isEmpty()) { |
| | | executeSave(); |
| | | } |
| | | } |
| | | |
| | | public void executeSave(){ |
| | | try { |
| | | executeSaveForAdd(); |
| | | } catch (Exception e) { |
| | | logger.error("[存储收到的增加通道] 异常: ", e ); |
| | | } |
| | | try { |
| | | executeSaveForOnline(); |
| | | } catch (Exception e) { |
| | | logger.error("[存储收到的通道上线] 异常: ", e ); |
| | | } |
| | | try { |
| | | executeSaveForOffline(); |
| | | } catch (Exception e) { |
| | | logger.error("[存储收到的通道离线] 异常: ", e ); |
| | | } |
| | | try { |
| | | executeSaveForUpdate(); |
| | | } catch (Exception e) { |
| | | logger.error("[存储收到的更新通道] 异常: ", e ); |
| | | } |
| | | try { |
| | | executeSaveForDelete(); |
| | | } catch (Exception e) { |
| | | logger.error("[存储收到的删除通道] 异常: ", e ); |
| | | } |
| | | } |
| | | |
| | | private void executeSaveForUpdate(){ |
| | | if (!updateChannelMap.values().isEmpty()) { |
| | | logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size()); |
| | | ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); |
| | | deviceChannelService.batchUpdateChannel(deviceChannels); |
| | | updateChannelMap.clear(); |
| | | } |
| | | } |
| | | |
| | | private void executeSaveForAdd(){ |
| | | if (!addChannelMap.values().isEmpty()) { |
| | | ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values()); |
| | | addChannelMap.clear(); |
| | | deviceChannelService.batchAddChannel(deviceChannels); |
| | | } |
| | | } |
| | | |
| | | private void executeSaveForDelete(){ |
| | | if (!deleteChannelList.isEmpty()) { |
| | | deviceChannelService.deleteChannels(deleteChannelList); |
| | | deleteChannelList.clear(); |
| | | } |
| | | } |
| | | |
| | | private void executeSaveForOnline(){ |
| | | if (!updateChannelOnlineList.isEmpty()) { |
| | | deviceChannelService.channelsOnline(updateChannelOnlineList); |
| | | updateChannelOnlineList.clear(); |
| | | } |
| | | } |
| | | |
| | | private void executeSaveForOffline(){ |
| | | if (!updateChannelOfflineList.isEmpty()) { |
| | | deviceChannelService.channelsOffline(updateChannelOfflineList); |
| | | updateChannelOfflineList.clear(); |
| | | } |
| | | } |
| | | |
| | | // @Scheduled(fixedRate = 10000) //每1秒执行一次 |
| | | // public void execute(){ |
| | | // logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size()); |
| | | // } |
| | | } |