src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
@@ -45,10 +45,21 @@ for (GbStream gbStream : gbStreams) { String gbId = gbStream.getGbId(); GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); if (gpsMsgInfo != null && gbStream.isStatus()) { // 发送GPS消息 sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); if (gbStream.isStatus()) { if (gpsMsgInfo != null) { // 发送GPS消息 sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); }else { // 没有在redis找到新的消息就使用数据库的消息 gpsMsgInfo = new GPSMsgInfo(); gpsMsgInfo.setId(gbId); gpsMsgInfo.setLat(gbStream.getLongitude()); gpsMsgInfo.setLng(gbStream.getLongitude()); // 发送GPS消息 sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); } } } } } src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java
New file @@ -0,0 +1,39 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; /** * 定时查找redis中的GPS推送消息,并保存到对应的流中 */ @Component public class StreamGPSSubscribeTask { @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private IVideoManagerStorager storager; @Scheduled(fixedRate = 30 * 1000) //每30秒执行一次 public void execute(){ List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); if (gpsMsgInfo.size() > 0) { storager.updateStreamGPS(gpsMsgInfo); for (GPSMsgInfo msgInfo : gpsMsgInfo) { msgInfo.setStored(true); redisCatchStorage.updateGpsMsgInfo(msgInfo); } } } } src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
@@ -37,6 +37,8 @@ */ private String altitude; private boolean stored; public String getId() { return id; @@ -93,4 +95,12 @@ public void setAltitude(String altitude) { this.altitude = altitude; } public boolean isStored() { return stored; } public void setStored(boolean stored) { this.stored = stored; } } src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
@@ -17,7 +17,6 @@ @Override public void onMessage(Message message, byte[] bytes) { GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); System.out.println(JSON.toJSON(gpsMsgInfo)); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } } src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -195,6 +195,7 @@ void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo); GPSMsgInfo getGpsMsgInfo(String gbId); List<GPSMsgInfo> getAllGpsMsgInfo(); Long getSN(String method); src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import com.github.pagehelper.PageInfo; @@ -456,4 +457,6 @@ List<PlatformCatalog> queryCatalogInPlatform(String serverGBId); int delRelation(PlatformCatalog platformCatalog); int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfo); } src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -3,6 +3,7 @@ import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -94,4 +95,13 @@ void batchAdd(List<StreamPushItem> subList); @Update({"<script>" + "<foreach collection='gpsMsgInfos' item='item' separator=';'>" + " UPDATE" + " gb_stream" + " SET longitude=${item.lng}, latitude=${item.lat} " + "WHERE gbId=#{item.id}"+ "</foreach>" + "</script>"}) int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos); } src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -453,7 +453,7 @@ @Override public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) { String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId(); redis.set(key, gpsMsgInfo); redis.set(key, gpsMsgInfo, 60); // 默认GPS消息保存1分钟 } @Override @@ -476,4 +476,20 @@ public void delSubscribe(String key) { redis.del(key); } @Override public List<GPSMsgInfo> getAllGpsMsgInfo() { String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_*"; List<GPSMsgInfo> result = new ArrayList<>(); List<Object> keys = redis.scan(scanKey); for (int i = 0; i < keys.size(); i++) { String key = (String) keys.get(i); GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key); if (!gpsMsgInfo.isStored()) { // 只取没有存过得 result.add((GPSMsgInfo)redis.get(key)); } } return result; } } src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.dao.*; @@ -898,4 +899,9 @@ } return 0; } @Override public int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos) { return gbStreamMapper.updateStreamGPS(gpsMsgInfos); } }