src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -31,6 +31,8 @@ private Boolean logInDatebase = Boolean.TRUE; private Boolean usePushingAsStatus = Boolean.TRUE; private String serverId = "000000"; private String thirdPartyGBIdReg = "[\\s\\S]*"; @@ -136,4 +138,12 @@ public void setPlatformPlayTimeout(int platformPlayTimeout) { this.platformPlayTimeout = platformPlayTimeout; } public Boolean isUsePushingAsStatus() { return usePushingAsStatus; } public void setUsePushingAsStatus(Boolean usePushingAsStatus) { this.usePushingAsStatus = usePushingAsStatus; } } src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
@@ -3,7 +3,7 @@ import com.alibaba.fastjson.parser.ParserConfig; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.service.impl.*; import com.genersoft.iot.vmp.service.redisMsg.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.context.annotation.Bean; src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java
New file @@ -0,0 +1,56 @@ package com.genersoft.iot.vmp.gb28181.bean; import org.dom4j.Element; import javax.sip.RequestEvent; public class SipMsgInfo { private RequestEvent evt; private Device device; private ParentPlatform platform; private Element rootElement; public SipMsgInfo(RequestEvent evt, Device device, Element rootElement) { this.evt = evt; this.device = device; this.rootElement = rootElement; } public SipMsgInfo(RequestEvent evt, ParentPlatform platform, Element rootElement) { this.evt = evt; this.platform = platform; this.rootElement = rootElement; } public RequestEvent getEvt() { return evt; } public void setEvt(RequestEvent evt) { this.evt = evt; } public Device getDevice() { return device; } public void setDevice(Device device) { this.device = device; } public ParentPlatform getPlatform() { return platform; } public void setPlatform(ParentPlatform platform) { this.platform = platform; } public Element getRootElement() { return rootElement; } public void setRootElement(Element rootElement) { this.rootElement = rootElement; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -14,18 +14,15 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SerializeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.RequestEvent; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -22,8 +22,8 @@ import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -21,6 +21,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -31,6 +33,7 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.concurrent.ConcurrentLinkedQueue; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*; @@ -67,6 +70,15 @@ @Autowired private IDeviceChannelService deviceChannelService; private boolean taskQueueHandlerRun = false; private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { notifyMessageHandler.addHandler(cmdType, this); @@ -75,114 +87,127 @@ @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { logger.info("[收到报警通知]设备:{}", device.getDeviceId()); // 回复200 OK try { responseAck(getServerTransaction(evt), Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[收到报警通知], 回复200OK失败", e); } Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText().toString(); taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { SipMsgInfo sipMsgInfo = taskQueue.poll(); // 回复200 OK try { responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[收到报警通知], 回复200OK失败", e); } DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setCreateTime(DateUtil.getNow()); deviceAlarm.setDeviceId(device.getDeviceId()); deviceAlarm.setChannelId(channelId); deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod")); String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); if (alarmTime == null) { return; } deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); String alarmDescription = getText(rootElement, "AlarmDescription"); if (alarmDescription == null) { deviceAlarm.setAlarmDescription(""); } else { deviceAlarm.setAlarmDescription(alarmDescription); } String longitude = getText(rootElement, "Longitude"); if (longitude != null && NumericUtil.isDouble(longitude)) { deviceAlarm.setLongitude(Double.parseDouble(longitude)); } else { deviceAlarm.setLongitude(0.00); } String latitude = getText(rootElement, "Latitude"); if (latitude != null && NumericUtil.isDouble(latitude)) { deviceAlarm.setLatitude(Double.parseDouble(latitude)); } else { deviceAlarm.setLatitude(0.00); } Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); String channelId = deviceIdElement.getText().toString(); if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setCreateTime(DateUtil.getNow()); deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); deviceAlarm.setChannelId(channelId); deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority")); deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime"); if (alarmTime == null) { return; } deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription"); if (alarmDescription == null) { deviceAlarm.setAlarmDescription(""); } else { deviceAlarm.setAlarmDescription(alarmDescription); } String longitude = getText(sipMsgInfo.getRootElement(), "Longitude"); if (longitude != null && NumericUtil.isDouble(longitude)) { deviceAlarm.setLongitude(Double.parseDouble(longitude)); } else { deviceAlarm.setLongitude(0.00); } String latitude = getText(sipMsgInfo.getRootElement(), "Latitude"); if (latitude != null && NumericUtil.isDouble(latitude)) { deviceAlarm.setLatitude(Double.parseDouble(latitude)); } else { deviceAlarm.setLatitude(0.00); } // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); deviceChannel.setChannelId(channelId); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); deviceChannel = deviceChannelService.updateGps(deviceChannel, device); // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); deviceChannel.setChannelId(channelId); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice()); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } storager.updateChannelPosition(deviceChannel); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", mobilePosition.getTime()); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); } } if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) { deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType")); } } if ("7".equals(deviceAlarm.getAlarmMethod()) ) { // 发送给平台的报警信息。 发送redis通知 AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod())); alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); alarmChannelMessage.setGbId(channelId); redisCatchStorage.sendAlarmMsg(alarmChannelMessage); return; } logger.debug("存储报警信息、报警分类"); // 存储报警信息、报警分类 if (sipConfig.isAlarm()) { deviceAlarmService.add(deviceAlarm); } logger.info("[收到报警通知]内容:{}", JSONObject.toJSON(deviceAlarm)); if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) { publisher.deviceAlarmEventPublish(deviceAlarm); } } storager.updateChannelPosition(deviceChannel); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", mobilePosition.getTime()); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); } } if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) { deviceAlarm.setAlarmType(getText(rootElement.element("Info"), "AlarmType")); } taskQueueHandlerRun = false; }); } if ("7".equals(deviceAlarm.getAlarmMethod()) ) { // 发送给平台的报警信息。 发送redis通知 AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod())); alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); alarmChannelMessage.setGbId(channelId); redisCatchStorage.sendAlarmMsg(alarmChannelMessage); return; } logger.debug("存储报警信息、报警分类"); // 存储报警信息、报警分类 if (sipConfig.isAlarm()) { deviceAlarmService.add(deviceAlarm); } logger.info("[收到报警通知]内容:{}", JSONObject.toJSON(deviceAlarm)); if (redisCatchStorage.deviceIsOnline(device.getDeviceId())) { publisher.deviceAlarmEventPublish(deviceAlarm); } } @Override src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -17,6 +17,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -26,6 +28,7 @@ import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; import java.util.concurrent.ConcurrentLinkedQueue; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -53,6 +56,14 @@ @Autowired private IDeviceChannelService deviceChannelService; private boolean taskQueueHandlerRun = false; private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { notifyMessageHandler.addHandler(cmdType, this); @@ -61,78 +72,91 @@ @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { try { rootElement = getRootElement(evt, device.getCharset()); if (rootElement == null) { logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", evt.getRequest()); responseAck(getServerTransaction(evt), Response.BAD_REQUEST); return; } MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); if (!ObjectUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); } mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(getText(rootElement, "DeviceID")); mobilePosition.setTime(getText(rootElement, "Time")); mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed"))); } else { mobilePosition.setSpeed(0.0); } if (NumericUtil.isDouble(getText(rootElement, "Direction"))) { mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction"))); } else { mobilePosition.setDirection(0.0); } if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) { mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude"))); } else { mobilePosition.setAltitude(0.0); } mobilePosition.setReportSource("Mobile Position"); taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { SipMsgInfo sipMsgInfo = taskQueue.poll(); try { Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); if (rootElementAfterCharset == null) { logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest()); responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST); return; } MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); if (!ObjectUtils.isEmpty(sipMsgInfo.getDevice().getName())) { mobilePosition.setDeviceName(sipMsgInfo.getDevice().getName()); } mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); mobilePosition.setChannelId(getText(rootElementAfterCharset, "DeviceID")); mobilePosition.setTime(getText(rootElementAfterCharset, "Time")); mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) { mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed"))); } else { mobilePosition.setSpeed(0.0); } if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) { mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction"))); } else { mobilePosition.setDirection(0.0); } if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) { mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude"))); } else { mobilePosition.setAltitude(0.0); } mobilePosition.setReportSource("Mobile Position"); // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); deviceChannel.setChannelId(mobilePosition.getChannelId()); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); deviceChannel.setChannelId(mobilePosition.getChannelId()); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); deviceChannel = deviceChannelService.updateGps(deviceChannel, device); deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice()); mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } storager.updateChannelPosition(deviceChannel); //回复 200 OK responseAck(getServerTransaction(evt), Response.OK); if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } storager.updateChannelPosition(deviceChannel); //回复 200 OK responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", mobilePosition.getTime()); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); jsonObject.put("time", mobilePosition.getTime()); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); jsonObject.put("latitude", mobilePosition.getLatitude()); jsonObject.put("altitude", mobilePosition.getAltitude()); jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } taskQueueHandlerRun = false; }); } } @Override src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; @@ -58,6 +59,9 @@ @Autowired private GbStreamMapper gbStreamMapper; @Autowired private UserSetting userSetting; @@ -241,7 +245,7 @@ if (subscribe != null) { // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId()); List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId(), userSetting.isUsePushingAsStatus()); if (gbStreams.size() == 0) { return; } src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -131,23 +131,6 @@ StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); result.onCompletion(()->{ // 点播结束时调用截图接口 taskExecutor.execute(()->{ // TODO 应该在上流时调用更好,结束也可能是错误结束 String path = "snap"; String fileName = deviceId + "_" + channelId + ".jpg"; WVPResult wvpResult = (WVPResult)result.getResult(); if (Objects.requireNonNull(wvpResult).getCode() == 0) { StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData(); MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); String streamUrl = streamInfoForSuccess.getFmp4(); // 请求截图 logger.info("[请求截图]: " + fileName); zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName); } }); }); if (streamInfo != null) { String streamId = streamInfo.getStream(); if (streamId == null) { @@ -209,6 +192,21 @@ SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{ // 点播结束时调用截图接口 taskExecutor.execute(()->{ // TODO 应该在上流时调用更好,结束也可能是错误结束 String path = "snap"; String fileName = deviceId + "_" + channelId + ".jpg"; WVPResult wvpResult = (WVPResult)result.getResult(); if (Objects.requireNonNull(wvpResult).getCode() == 0) { StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData(); MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); String streamUrl = streamInfoForSuccess.getFmp4(); // 请求截图 logger.info("[请求截图]: " + fileName); zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName); } }); if (hookEvent != null) { hookEvent.response(mediaServerItem, response); } src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
New file @@ -0,0 +1,100 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @Component public class RedisAlarmMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisAlarmMsgListener.class); @Autowired private ISIPCommander commander; @Autowired private ISIPCommanderForPlatform commanderForPlatform; @Autowired private IVideoManagerStorage storage; private boolean taskQueueHandlerRun = false; private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void onMessage(Message message, byte[] bytes) { logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody())); taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); if (alarmChannelMessage == null) { logger.warn("[REDIS的ALARM通知]消息解析失败"); return; } String gbId = alarmChannelMessage.getGbId(); DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setCreateTime(DateUtil.getNow()); deviceAlarm.setChannelId(gbId); deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); deviceAlarm.setAlarmPriority("1"); deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601()); deviceAlarm.setAlarmType("1"); deviceAlarm.setLongitude(0); deviceAlarm.setLatitude(0); if (ObjectUtils.isEmpty(gbId)) { // 发送给所有的上级 List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true); if (parentPlatforms.size() > 0) { for (ParentPlatform parentPlatform : parentPlatforms) { commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); } } }else { Device device = storage.queryVideoDevice(gbId); ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId); if (device != null && platform == null) { commander.sendAlarmMessage(device, deviceAlarm); }else if (device == null && platform != null){ commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); }else { logger.warn("无法确定" + gbId + "是平台还是设备"); } } } taskQueueHandlerRun = false; }); } } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
File was renamed from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java @@ -1,4 +1,4 @@ package com.genersoft.iot.vmp.service.impl; package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; @@ -19,8 +19,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.text.ParseException; @@ -28,6 +30,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -85,6 +88,14 @@ @Autowired private ZlmHttpHookSubscribe subscribe; private boolean taskQueueHandlerRun = false; private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; public interface PlayMsgCallback{ void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException; @@ -100,94 +111,107 @@ @Override public void onMessage(Message message, byte[] bytes) { JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class); WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { return; } if (WvpRedisMsg.isRequest(wvpRedisMsg)) { logger.info("[收到REDIS通知] 请求: {}", new String(message.getBody())); switch (wvpRedisMsg.getCmd()){ case WvpRedisMsgCmd.GET_SEND_ITEM: RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; } taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { return; } if (WvpRedisMsg.isRequest(wvpRedisMsg)) { logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); }else { logger.info("[收到REDIS通知] 回复: {}", new String(message.getBody())); switch (wvpRedisMsg.getCmd()){ case WvpRedisMsgCmd.GET_SEND_ITEM: switch (wvpRedisMsg.getCmd()){ case WvpRedisMsgCmd.GET_SEND_ITEM: RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; } WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); }else { logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); switch (wvpRedisMsg.getCmd()){ case WvpRedisMsgCmd.GET_SEND_ITEM: String key = wvpRedisMsg.getSerial(); switch (content.getCode()) { case 0: ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); PlayMsgCallback playMsgCallback = callbacks.get(key); if (playMsgCallback != null) { callbacksForError.remove(key); try { playMsgCallback.handler(responseSendItemMsg); } catch (ParseException e) { throw new RuntimeException(e); WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); String key = wvpRedisMsg.getSerial(); switch (content.getCode()) { case 0: ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); PlayMsgCallback playMsgCallback = callbacks.get(key); if (playMsgCallback != null) { callbacksForError.remove(key); try { playMsgCallback.handler(responseSendItemMsg); } catch (ParseException e) { throw new RuntimeException(e); } } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: case ERROR_CODE_OFFLINE: case ERROR_CODE_TIMEOUT: PlayMsgErrorCallback errorCallback = callbacksForError.get(key); if (errorCallback != null) { callbacks.remove(key); errorCallback.handler(content); } break; default: break; } } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: case ERROR_CODE_OFFLINE: case ERROR_CODE_TIMEOUT: PlayMsgErrorCallback errorCallback = callbacksForError.get(key); if (errorCallback != null) { callbacks.remove(key); errorCallback.handler(content); } break; default: break; break; case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); String serial = wvpRedisMsg.getSerial(); switch (wvpResult.getCode()) { case 0: JSONObject jsonObject = (JSONObject)wvpResult.getData(); PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); if (playMsgCallback != null) { callbacksForError.remove(serial); playMsgCallback.handler(jsonObject); } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: case ERROR_CODE_OFFLINE: case ERROR_CODE_TIMEOUT: PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); if (errorCallback != null) { callbacks.remove(serial); errorCallback.handler(wvpResult); } break; default: break; } break; default: break; } } break; case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); String serial = wvpRedisMsg.getSerial(); switch (wvpResult.getCode()) { case 0: JSONObject jsonObject = (JSONObject)wvpResult.getData(); PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); if (playMsgCallback != null) { callbacksForError.remove(serial); playMsgCallback.handler(jsonObject); } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: case ERROR_CODE_OFFLINE: case ERROR_CODE_TIMEOUT: PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); if (errorCallback != null) { callbacks.remove(serial); errorCallback.handler(wvpResult); } break; default: break; } break; default: break; } } taskQueueHandlerRun = false; }); } } /** src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
File was renamed from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.service.impl; package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
New file @@ -0,0 +1,75 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** * 接收redis返回的推流结果 * @author lin */ @Component public class RedisPushStreamResponseListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); private boolean taskQueueHandlerRun = false; private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); public interface PushStreamResponseEvent{ void run(MessageForPushChannelResponse response); } @Override public void onMessage(Message message, byte[] bytes) { logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ logger.info("[REDIS消息-请求推流结果]:参数不全"); return; } // 查看正在等待的invite消息 if (responseEvents.get(response.getApp() + response.getStream()) != null) { responseEvents.get(response.getApp() + response.getStream()).run(response); } } taskQueueHandlerRun = false; }); } } public void addEvent(String app, String stream, PushStreamResponseEvent callback) { responseEvents.put(app + stream, callback); } public void removeEvent(String app, String stream) { responseEvents.remove(app + stream); } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
New file @@ -0,0 +1,103 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; /** * @Auther: JiangFeng * @Date: 2022/8/16 11:32 * @Description: 接收redis发送的推流设备列表更新通知 */ @Component public class RedisPushStreamStatusListMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class); @Resource private IMediaServerService mediaServerService; @Resource private IStreamPushService streamPushService; @Resource private IGbStreamService gbStreamService; private boolean taskQueueHandlerRun = false; private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void onMessage(Message message, byte[] bytes) { logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody())); taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); //查询全部的app+stream 用于判断是添加还是修改 List<String> allAppAndStream = streamPushService.getAllAppAndStream(); /** * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 */ List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); for (StreamPushItem streamPushItem : streamPushItems) { String app = streamPushItem.getApp(); String stream = streamPushItem.getStream(); boolean contains = allAppAndStream.contains(app + stream); //不存在就添加 if (!contains) { streamPushItem.setStreamType("push"); streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); streamPushItem.setOriginType(2); streamPushItem.setOriginTypeStr("rtsp_push"); streamPushItem.setTotalReaderCount("0"); streamPushItemForSave.add(streamPushItem); } else { //存在就只修改 name和gbId streamPushItemForUpdate.add(streamPushItem); } } if (streamPushItemForSave.size() > 0) { logger.info("添加{}条",streamPushItemForSave.size()); logger.info(JSONObject.toJSONString(streamPushItemForSave)); streamPushService.batchAdd(streamPushItemForSave); } if(streamPushItemForUpdate.size()>0){ logger.info("修改{}条",streamPushItemForUpdate.size()); logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); gbStreamService.updateGbIdOrName(streamPushItemForUpdate); } } taskQueueHandlerRun = false; }); } } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
File was renamed from src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java @@ -1,24 +1,11 @@ package com.genersoft.iot.vmp.service.impl; package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -30,8 +17,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -65,7 +50,6 @@ @Override public void onMessage(Message message, byte[] bytes) { // TODO 增加队列 logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody())); taskQueue.offer(message); src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
New file @@ -0,0 +1,91 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentLinkedQueue; /** * 接收其他wvp发送流变化通知 * @author lin */ @Component public class RedisStreamMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); @Autowired private UserSetting userSetting; @Autowired private ZLMMediaListManager zlmMediaListManager; private boolean taskQueueHandlerRun = false; private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void onMessage(Message message, byte[] bytes) { taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); if (steamMsgJson == null) { logger.warn("[收到redis 流变化]消息解析失败"); return; } String serverId = steamMsgJson.getString("serverId"); if (userSetting.getServerId().equals(serverId)) { // 自己发送的消息忽略即可 return; } logger.info("[收到redis 流变化]: {}", new String(message.getBody())); String app = steamMsgJson.getString("app"); String stream = steamMsgJson.getString("stream"); boolean register = steamMsgJson.getBoolean("register"); String mediaServerId = steamMsgJson.getString("mediaServerId"); MediaItem mediaItem = new MediaItem(); mediaItem.setSeverId(serverId); mediaItem.setApp(app); mediaItem.setStream(stream); mediaItem.setRegist(register); mediaItem.setMediaServerId(mediaServerId); mediaItem.setCreateStamp(System.currentTimeMillis()/1000); mediaItem.setAliveSecond(0L); mediaItem.setTotalReaderCount("0"); mediaItem.setOriginType(0); mediaItem.setOriginTypeStr("0"); mediaItem.setOriginTypeStr("unknown"); if (register) { zlmMediaListManager.addPush(mediaItem); }else { zlmMediaListManager.removeMedia(app, stream); } } taskQueueHandlerRun = false; }); } } } src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -75,18 +75,23 @@ "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'") GbStream queryStreamInPlatform(String platformId, String gbId); @Select("select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture, st.status, gt.longitude, gt.latitude, pc.id as parentId," + @Select("<script> "+ "select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture, st.status, gt.longitude, gt.latitude, pc.id as parentId," + " '1' as registerWay, pc.civilCode, 'live' as model, 'wvp-pro' as owner, '0' as parental,'0' as secrecy" + " from gb_stream gt " + " left join (" + " select sp.status, sp.app, sp.stream from stream_push sp" + " select " + " <if test='usPushingAsStatus != true'> sp.status as status, </if>" + " <if test='usPushingAsStatus == true'> sp.pushIng as status, </if>" + "sp.app, sp.stream from stream_push sp" + " union all" + " select spxy.status, spxy.app, spxy.stream from stream_proxy spxy" + " ) st on st.app = gt.app and st.stream = gt.stream" + " left join platform_gb_stream pgs on gt.gbStreamId = pgs.gbStreamId" + " left join platform_catalog pc on pgs.catalogId = pc.id and pgs.platformId = pc.platformId" + " where pgs.platformId=#{platformId}") List<DeviceChannel> queryGbStreamListInPlatform(String platformId); " where pgs.platformId=#{platformId}" + "</script>") List<DeviceChannel> queryGbStreamListInPlatform(String platformId, boolean usPushingAsStatus); @Select("SELECT gs.* FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " + src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; @@ -82,6 +83,9 @@ @Autowired private GbStreamMapper gbStreamMapper; @Autowired private UserSetting userSetting; @Autowired private PlatformCatalogMapper catalogMapper; @@ -614,7 +618,7 @@ */ @Override public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) { return gbStreamMapper.queryGbStreamListInPlatform(platformId); return gbStreamMapper.queryGbStreamListInPlatform(platformId, userSetting.isUsePushingAsStatus()); } /** src/main/resources/all-application.yml
@@ -188,6 +188,8 @@ record-sip: true # 是否将日志存储进数据库 logInDatebase: true # 使用推流状态作为推流通道状态 use-pushing-as-status: true # 关闭在线文档(生产环境建议关闭) springdoc: