src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -103,6 +103,16 @@ return platforms; } public List<String> getAllMobilePositionSubscribePlatform() { List<String> platforms = new ArrayList<>(); if(!mobilePositionMap.isEmpty()) { for (String key : mobilePositionMap.keySet()) { platforms.add(mobilePositionMap.get(key).getId()); } } return platforms; } public void removeAllSubscribe(String platformId) { removeMobilePositionSubscribe(platformId); removeCatalogSubscribe(platformId); src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent; import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent; import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent; import org.springframework.beans.factory.annotation.Autowired; @@ -94,6 +95,13 @@ } public void mobilePositionEventPublish(MobilePosition mobilePosition) { MobilePositionEvent event = new MobilePositionEvent(this); event.setMobilePosition(mobilePosition); applicationEventPublisher.publishEvent(event); } public void catalogEventPublishForStream(String platformId, List<GbStream> gbStreams, String type) { CatalogEvent outEvent = new CatalogEvent(this); outEvent.setGbStreams(gbStreams); src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java
New file @@ -0,0 +1,20 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import org.springframework.context.ApplicationEvent; public class MobilePositionEvent extends ApplicationEvent { public MobilePositionEvent(Object source) { super(source); } private MobilePosition mobilePosition; public MobilePosition getMobilePosition() { return mobilePosition; } public void setMobilePosition(MobilePosition mobilePosition) { this.mobilePosition = mobilePosition; } } src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java
New file @@ -0,0 +1,61 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.util.List; /** * 移动位置通知消息转发 */ @Component public class MobilePositionEventLister implements ApplicationListener<MobilePositionEvent> { private final static Logger logger = LoggerFactory.getLogger(MobilePositionEventLister.class); @Autowired private IVideoManagerStorage storager; @Autowired private SIPCommanderFroPlatform sipCommanderFroPlatform; @Autowired private SubscribeHolder subscribeHolder; @Override public void onApplicationEvent(MobilePositionEvent event) { // 获取所用订阅 List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(); if (platforms.isEmpty()) { return; } List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(event.getMobilePosition().getChannelId(), platforms); for (ParentPlatform platform : parentPlatformsForGB) { logger.info("[向上级发送MobilePosition] 通道:{},平台:{}, 位置: {}:{}", event.getMobilePosition().getChannelId(), platform.getServerGBId(), event.getMobilePosition().getLongitude(), event.getMobilePosition().getLatitude()); SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); try { sipCommanderFroPlatform.sendNotifyMobilePosition(platform, GPSMsgInfo.getInstance(event.getMobilePosition()), subscribe); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { logger.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); } } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -78,9 +76,6 @@ @Autowired private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; @Autowired private CivilCodeFileConf civilCodeFileConf; private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -98,7 +93,6 @@ @Override public void process(RequestEvent evt) { try { if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); @@ -234,25 +228,8 @@ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); storager.updateChannelPosition(deviceChannel); // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceId); jsonObject.put("code", channelId); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); } catch (DocumentException e) { logger.error("未处理的异常 ", e); } @@ -340,25 +317,8 @@ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } storager.updateChannelPosition(deviceChannel); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); } // TODO: 需要实现存储报警信息、报警分类 // 回复200 OK if (redisCatchStorage.deviceIsOnline(deviceId)) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -75,6 +75,9 @@ @Autowired private ThreadPoolTaskExecutor taskExecutor; @Autowired private EventPublisher eventPublisher; @Override public void afterPropertiesSet() throws Exception { @@ -158,22 +161,7 @@ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } storager.updateChannelPosition(deviceChannel); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); } } if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -1,8 +1,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; @@ -56,6 +56,9 @@ @Autowired private IDeviceChannelService deviceChannelService; @Autowired private EventPublisher eventPublisher; private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); @@ -137,22 +140,7 @@ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } storager.updateChannelPosition(deviceChannel); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); } catch (DocumentException e) { logger.error("未处理的异常 ", e); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java
@@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -131,11 +130,7 @@ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } storager.updateChannelPosition(deviceChannel); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId(); RequestMessage msg = new RequestMessage(); @@ -143,17 +138,6 @@ msg.setData(mobilePosition); resultHolder.invokeAllResult(msg); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); //回复 200 OK try { responseAck(request, Response.OK); src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; @@ -92,4 +93,9 @@ * 修改通道的码流类型 */ void updateChannelStreamIdentification(DeviceChannel channel); List<DeviceChannel> queryChaneListByDeviceId(String deviceId); void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition); } src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
@@ -1,5 +1,8 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.utils.DateUtil; public class GPSMsgInfo { /** @@ -39,6 +42,18 @@ private boolean stored; public static GPSMsgInfo getInstance(MobilePosition mobilePosition) { GPSMsgInfo gpsMsgInfo = new GPSMsgInfo(); gpsMsgInfo.setId(mobilePosition.getChannelId()); gpsMsgInfo.setAltitude(mobilePosition.getAltitude() + ""); gpsMsgInfo.setLng(mobilePosition.getLongitude()); gpsMsgInfo.setLat(mobilePosition.getLatitude()); gpsMsgInfo.setSpeed(mobilePosition.getSpeed()); gpsMsgInfo.setDirection(mobilePosition.getDirection() + ""); gpsMsgInfo.setTime(DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); return gpsMsgInfo; } public String getId() { return id; src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -1,16 +1,21 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSONObject; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; @@ -35,7 +40,7 @@ private final static Logger logger = LoggerFactory.getLogger(DeviceChannelServiceImpl.class); @Autowired private IRedisCatchStorage redisCatchStorage; private EventPublisher eventPublisher; @Autowired private IInviteStreamService inviteStreamService; @@ -45,6 +50,15 @@ @Autowired private DeviceMapper deviceMapper; @Autowired private DeviceMobilePositionMapper deviceMobilePositionMapper; @Autowired private UserSetting userSetting; @Autowired private IRedisCatchStorage redisCatchStorage; @Override public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) { @@ -84,7 +98,6 @@ public void updateChannel(String deviceId, DeviceChannel channel) { String channelId = channel.getChannelId(); channel.setDeviceId(deviceId); // StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { channel.setStreamId(inviteInfo.getStreamInfo().getStream()); @@ -280,4 +293,64 @@ } channelMapper.updateChannelStreamIdentification(channel); } @Override public List<DeviceChannel> queryChaneListByDeviceId(String deviceId) { return channelMapper.queryAllChannels(deviceId); } @Override public void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition) { if (userSetting.getSavePositionHistory()) { deviceMobilePositionMapper.insertNewPosition(mobilePosition); } if (deviceChannel.getChannelId().equals(deviceChannel.getDeviceId())) { deviceChannel.setChannelId(null); } if (deviceChannel.getGpsTime() == null) { deviceChannel.setGpsTime(DateUtil.getNow()); } int updated = channelMapper.updatePosition(deviceChannel); if (updated == 0) { return; } List<DeviceChannel> deviceChannels = new ArrayList<>(); if (deviceChannel.getChannelId() == null) { // 有的设备这里上报的deviceId与通道Id是一样,这种情况更新设备下的全部通道 List<DeviceChannel> deviceChannelsInDb = queryChaneListByDeviceId(device.getDeviceId()); deviceChannels.addAll(deviceChannelsInDb); }else { deviceChannels.add(deviceChannel); } if (deviceChannels.isEmpty()) { return; } if (deviceChannels.size() > 100) { logger.warn("[更新通道位置信息后发送通知] 设备可能是平台,上报的位置信息未标明通道编号," + "导致所有通道被更新位置, deviceId:{}", device.getDeviceId()); } for (DeviceChannel channel : deviceChannels) { // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 mobilePosition.setChannelId(channel.getChannelId()); try { eventPublisher.mobilePositionEventPublish(mobilePosition); }catch (Exception e) { logger.error("[向上级转发移动位置失败] ", e); } // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", mobilePosition.getDeviceId()); jsonObject.put("code", mobilePosition.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); } } } src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -395,7 +395,7 @@ "WHERE device_id=#{deviceId} " + " <if test='channelId != null' > AND channel_id=#{channelId}</if>" + " </script>"}) void updatePosition(DeviceChannel deviceChannel); int updatePosition(DeviceChannel deviceChannel); @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List<DeviceChannel> getAllChannelInPlay();