package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.time.Instant; import java.util.List; import java.util.concurrent.TimeUnit; /** * 设备业务(目录订阅) */ @Service public class DeviceServiceImpl implements IDeviceService { private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class); private final String registerExpireTaskKeyPrefix = "device-register-expire-"; @Autowired private DynamicTask dynamicTask; @Autowired private ISIPCommander sipCommander; @Autowired private CatalogResponseMessageHandler catalogResponseMessageHandler; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private DeviceMapper deviceMapper; @Autowired private ISIPCommander commander; @Autowired private VideoStreamSessionManager streamSession; @Autowired private IMediaServerService mediaServerService; @Override public void online(Device device) { logger.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId()); Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId()); String now = DateUtil.getNow(); if (deviceInRedis != null && deviceInDb == null) { // redis 存在脏数据 redisCatchStorage.clearCatchByDeviceId(device.getDeviceId()); } device.setUpdateTime(now); device.setOnline(1); // 第一次上线 if (device.getCreateTime() == null) { device.setCreateTime(now); logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId()); deviceMapper.add(device); redisCatchStorage.updateDevice(device); commander.deviceInfoQuery(device); sync(device); }else { deviceMapper.update(device); redisCatchStorage.updateDevice(device); } // 上线添加订阅 if (device.getSubscribeCycleForCatalog() > 0) { // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅 addCatalogSubscribe(device); } if (device.getSubscribeCycleForMobilePosition() > 0) { addMobilePositionSubscribe(device); } // 刷新过期任务 String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId(); dynamicTask.stop(registerExpireTaskKey); dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId()), device.getExpires() * 1000); } @Override public void offline(String deviceId) { Device device = deviceMapper.getDeviceByDeviceId(deviceId); if (device == null) { return; } String registerExpireTaskKey = registerExpireTaskKeyPrefix + deviceId; dynamicTask.stop(registerExpireTaskKey); device.setOnline(0); redisCatchStorage.updateDevice(device); deviceMapper.update(device); // 离线释放所有ssrc List ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null); if (ssrcTransactions != null && ssrcTransactions.size() > 0) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) { mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); } } // 移除订阅 removeCatalogSubscribe(device); removeMobilePositionSubscribe(device); } @Override public boolean addCatalogSubscribe(Device device) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } logger.info("[添加目录订阅] 设备{}", device.getDeviceId()); // 添加目录订阅 CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander, dynamicTask); // 提前开始刷新订阅 int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30); // 设置最小值为30 dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000); return true; } @Override public boolean removeCatalogSubscribe(Device device) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } logger.info("移除目录订阅: {}", device.getDeviceId()); dynamicTask.stop(device.getDeviceId() + "catalog"); return true; } @Override public boolean addMobilePositionSubscribe(Device device) { if (device == null || device.getSubscribeCycleForMobilePosition() < 0) { return false; } logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId()); // 添加目录订阅 MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander, dynamicTask); // 设置最小值为30 int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30); // 提前开始刷新订阅 dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, (subscribeCycleForCatalog -1 ) * 1000); return true; } @Override public boolean removeMobilePositionSubscribe(Device device) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } logger.info("移除移动位置订阅: {}", device.getDeviceId()); dynamicTask.stop(device.getDeviceId() + "mobile_position"); return true; } @Override public SyncStatus getChannelSyncStatus(String deviceId) { return catalogResponseMessageHandler.getChannelSyncProgress(deviceId); } @Override public Boolean isSyncRunning(String deviceId) { return catalogResponseMessageHandler.isSyncRunning(deviceId); } @Override public void sync(Device device) { if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) { logger.info("开启同步时发现同步已经存在"); return; } int sn = (int)((Math.random()*9+1)*100000); catalogResponseMessageHandler.setChannelSyncReady(device, sn); sipCommander.catalogQuery(device, sn, event -> { String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg); }); } @Override public Device queryDevice(String deviceId) { return deviceMapper.getDeviceByDeviceId(deviceId); } @Override public List getAllOnlineDevice() { return deviceMapper.getOnlineDevices(); } @Override public boolean expire(Device device) { Instant registerTimeDate = Instant.from(DateUtil.formatter.parse(device.getRegisterTime())); Instant expireInstant = registerTimeDate.plusMillis(TimeUnit.SECONDS.toMillis(device.getExpires())); return expireInstant.isBefore(Instant.now()); } @Override public void checkDeviceStatus(Device device) { if (device == null || device.getOnline() == 0) { return; } sipCommander.deviceStatusQuery(device, null); } @Override public Device getDeviceByHostAndPort(String host, int port) { return deviceMapper.getDeviceByHostAndPort(host, port); } @Override public void updateDevice(Device device) { Device deviceInStore = deviceMapper.getDeviceByDeviceId(device.getDeviceId()); if (deviceInStore == null) { logger.warn("更新设备时未找到设备信息"); return; } if (!StringUtils.isEmpty(device.getName())) { deviceInStore.setName(device.getName()); } if (!StringUtils.isEmpty(device.getCharset())) { deviceInStore.setCharset(device.getCharset()); } if (!StringUtils.isEmpty(device.getMediaServerId())) { deviceInStore.setMediaServerId(device.getMediaServerId()); } // 目录订阅相关的信息 if (device.getSubscribeCycleForCatalog() > 0) { if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); // 开启订阅 addCatalogSubscribe(deviceInStore); } }else if (device.getSubscribeCycleForCatalog() == 0) { if (deviceInStore.getSubscribeCycleForCatalog() != 0) { deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); // 取消订阅 removeCatalogSubscribe(deviceInStore); } } // 移动位置订阅相关的信息 if (device.getSubscribeCycleForMobilePosition() > 0) { if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); // 开启订阅 addMobilePositionSubscribe(deviceInStore); } }else if (device.getSubscribeCycleForMobilePosition() == 0) { if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { // 取消订阅 removeMobilePositionSubscribe(deviceInStore); } } String now = DateUtil.getNow(); device.setUpdateTime(now); device.setCharset(device.getCharset().toUpperCase()); device.setUpdateTime(DateUtil.getNow()); if (deviceMapper.update(device) > 0) { redisCatchStorage.updateDevice(device); } } }