From 5e73874880cdfd5b6b99147a0cdd8a6eabcfbf16 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 22 九月 2022 11:22:08 +0800 Subject: [PATCH] 添加队列处理redis消息和sip消息,支持使用推流状态作为通道在线状态 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java | 152 ++++--- src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 6 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 32 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java | 18 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java | 75 +++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java | 219 ++++++---- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java | 3 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java | 91 ++++ src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java | 103 +++++ src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java | 2 src/main/resources/all-application.yml | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 5 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java | 100 +++++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 4 /dev/null | 72 --- src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java | 6 src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java | 10 src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 13 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java | 56 ++ src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 180 +++++--- 20 files changed, 789 insertions(+), 360 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index 017b39d..cad6e69 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -31,6 +31,8 @@ private Boolean logInDatebase = Boolean.TRUE; + private Boolean usePushingAsStatus = Boolean.TRUE; + private String serverId = "000000"; private String thirdPartyGBIdReg = "[\\s\\S]*"; @@ -136,4 +138,12 @@ public void setPlatformPlayTimeout(int platformPlayTimeout) { this.platformPlayTimeout = platformPlayTimeout; } + + public Boolean isUsePushingAsStatus() { + return usePushingAsStatus; + } + + public void setUsePushingAsStatus(Boolean usePushingAsStatus) { + this.usePushingAsStatus = usePushingAsStatus; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java index 0b653cf..e412f71 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java @@ -3,7 +3,7 @@ import com.alibaba.fastjson.parser.ParserConfig; import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.service.impl.*; +import com.genersoft.iot.vmp.service.redisMsg.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.context.annotation.Bean; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java new file mode 100644 index 0000000..302539b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java @@ -0,0 +1,56 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import org.dom4j.Element; + +import javax.sip.RequestEvent; + +public class SipMsgInfo { + private RequestEvent evt; + private Device device; + private ParentPlatform platform; + private Element rootElement; + + public SipMsgInfo(RequestEvent evt, Device device, Element rootElement) { + this.evt = evt; + this.device = device; + this.rootElement = rootElement; + } + + public SipMsgInfo(RequestEvent evt, ParentPlatform platform, Element rootElement) { + this.evt = evt; + this.platform = platform; + this.rootElement = rootElement; + } + + public RequestEvent getEvt() { + return evt; + } + + public void setEvt(RequestEvent evt) { + this.evt = evt; + } + + public Device getDevice() { + return device; + } + + public void setDevice(Device device) { + this.device = device; + } + + public ParentPlatform getPlatform() { + return platform; + } + + public void setPlatform(ParentPlatform platform) { + this.platform = platform; + } + + public Element getRootElement() { + return rootElement; + } + + public void setRootElement(Element rootElement) { + this.rootElement = rootElement; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index f8c3abf..33cc6e6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -14,18 +14,15 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; -import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.utils.SerializeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sip.Dialog; -import javax.sip.DialogState; import javax.sip.RequestEvent; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 6ff5c0f..ff3c78f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -22,8 +22,8 @@ import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; -import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener; +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; +import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index add32d1..da2cb9c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -21,6 +21,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -31,6 +33,7 @@ import javax.sip.message.Response; import java.text.ParseException; +import java.util.concurrent.ConcurrentLinkedQueue; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*; @@ -67,6 +70,15 @@ @Autowired private IDeviceChannelService deviceChannelService; + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override public void afterPropertiesSet() throws Exception { notifyMessageHandler.addHandler(cmdType, this); @@ -75,114 +87,127 @@ @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { logger.info("[鏀跺埌鎶ヨ閫氱煡]璁惧锛歿}", device.getDeviceId()); - // 鍥炲200 OK - try { - responseAck(getServerTransaction(evt), Response.OK); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鏀跺埌鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e); - } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); + taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + SipMsgInfo sipMsgInfo = taskQueue.poll(); + // 鍥炲200 OK + try { + responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鏀跺埌鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e); + } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setCreateTime(DateUtil.getNow()); - deviceAlarm.setDeviceId(device.getDeviceId()); - deviceAlarm.setChannelId(channelId); - deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority")); - deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod")); - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); - if (alarmTime == null) { - return; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - String alarmDescription = getText(rootElement, "AlarmDescription"); - if (alarmDescription == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(alarmDescription); - } - String longitude = getText(rootElement, "Longitude"); - if (longitude != null && NumericUtil.isDouble(longitude)) { - deviceAlarm.setLongitude(Double.parseDouble(longitude)); - } else { - deviceAlarm.setLongitude(0.00); - } - String latitude = getText(rootElement, "Latitude"); - if (latitude != null && NumericUtil.isDouble(latitude)) { - deviceAlarm.setLatitude(Double.parseDouble(latitude)); - } else { - deviceAlarm.setLatitude(0.00); - } + Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); - if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { - if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setCreateTime(DateUtil.getNow()); + deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); + deviceAlarm.setChannelId(channelId); + deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority")); + deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); + String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime"); + if (alarmTime == null) { + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription"); + if (alarmDescription == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(alarmDescription); + } + String longitude = getText(sipMsgInfo.getRootElement(), "Longitude"); + if (longitude != null && NumericUtil.isDouble(longitude)) { + deviceAlarm.setLongitude(Double.parseDouble(longitude)); + } else { + deviceAlarm.setLongitude(0.00); + } + String latitude = getText(sipMsgInfo.getRootElement(), "Latitude"); + if (latitude != null && NumericUtil.isDouble(latitude)) { + deviceAlarm.setLatitude(Double.parseDouble(latitude)); + } else { + deviceAlarm.setLatitude(0.00); + } - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { + if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice()); - if (userSetting.getSavePositionHistory()) { - storager.insertMobilePosition(mobilePosition); + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + + if (userSetting.getSavePositionHistory()) { + storager.insertMobilePosition(mobilePosition); + } + storager.updateChannelPosition(deviceChannel); + + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("serial", deviceChannel.getDeviceId()); + jsonObject.put("code", deviceChannel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } + } + if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { + if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) { + deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType")); + } + } + + if ("7".equals(deviceAlarm.getAlarmMethod()) ) { + // 鍙戦�佺粰骞冲彴鐨勬姤璀︿俊鎭�� 鍙戦�乺edis閫氱煡 + AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); + alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod())); + alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); + alarmChannelMessage.setGbId(channelId); + redisCatchStorage.sendAlarmMsg(alarmChannelMessage); + return; + } + + logger.debug("瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�"); + // 瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫� + if (sipConfig.isAlarm()) { + deviceAlarmService.add(deviceAlarm); + } + logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm)); + if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } } - storager.updateChannelPosition(deviceChannel); - - // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� - JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); - jsonObject.put("serial", deviceChannel.getDeviceId()); - jsonObject.put("code", deviceChannel.getChannelId()); - jsonObject.put("longitude", mobilePosition.getLongitude()); - jsonObject.put("latitude", mobilePosition.getLatitude()); - jsonObject.put("altitude", mobilePosition.getAltitude()); - jsonObject.put("direction", mobilePosition.getDirection()); - jsonObject.put("speed", mobilePosition.getSpeed()); - redisCatchStorage.sendMobilePositionMsg(jsonObject); - } - } - if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { - if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) { - deviceAlarm.setAlarmType(getText(rootElement.element("Info"), "AlarmType")); - } + taskQueueHandlerRun = false; + }); } - if ("7".equals(deviceAlarm.getAlarmMethod()) ) { - // 鍙戦�佺粰骞冲彴鐨勬姤璀︿俊鎭�� 鍙戦�乺edis閫氱煡 - AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); - alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod())); - alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); - alarmChannelMessage.setGbId(channelId); - redisCatchStorage.sendAlarmMsg(alarmChannelMessage); - return; - } - logger.debug("瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�"); - // 瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫� - if (sipConfig.isAlarm()) { - deviceAlarmService.add(deviceAlarm); - } - logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm)); - if (redisCatchStorage.deviceIsOnline(device.getDeviceId())) { - publisher.deviceAlarmEventPublish(deviceAlarm); - } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index b5051c0..67d26d7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -17,6 +17,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -26,6 +28,7 @@ import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; +import java.util.concurrent.ConcurrentLinkedQueue; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -53,6 +56,14 @@ @Autowired private IDeviceChannelService deviceChannelService; + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + @Override public void afterPropertiesSet() throws Exception { notifyMessageHandler.addHandler(cmdType, this); @@ -61,78 +72,91 @@ @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - try { - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 绉诲姩璁惧浣嶇疆鏁版嵁閫氱煡 ] content cannot be null, {}", evt.getRequest()); - responseAck(getServerTransaction(evt), Response.BAD_REQUEST); - return; - } - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(getText(rootElement, "DeviceID")); - mobilePosition.setTime(getText(rootElement, "Time")); - mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - mobilePosition.setReportSource("Mobile Position"); + taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + SipMsgInfo sipMsgInfo = taskQueue.poll(); + try { + Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); + if (rootElementAfterCharset == null) { + logger.warn("[ 绉诲姩璁惧浣嶇疆鏁版嵁閫氱煡 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest()); + responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST); + return; + } + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + if (!ObjectUtils.isEmpty(sipMsgInfo.getDevice().getName())) { + mobilePosition.setDeviceName(sipMsgInfo.getDevice().getName()); + } + mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); + mobilePosition.setChannelId(getText(rootElementAfterCharset, "DeviceID")); + mobilePosition.setTime(getText(rootElementAfterCharset, "Time")); + mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); + mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) { + mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed"))); + } else { + mobilePosition.setSpeed(0.0); + } + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) { + mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction"))); + } else { + mobilePosition.setDirection(0.0); + } + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) { + mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude"))); + } else { + mobilePosition.setAltitude(0.0); + } + mobilePosition.setReportSource("Mobile Position"); - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(mobilePosition.getChannelId()); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); + deviceChannel.setChannelId(mobilePosition.getChannelId()); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice()); - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - if (userSetting.getSavePositionHistory()) { - storager.insertMobilePosition(mobilePosition); - } - storager.updateChannelPosition(deviceChannel); - //鍥炲 200 OK - responseAck(getServerTransaction(evt), Response.OK); + if (userSetting.getSavePositionHistory()) { + storager.insertMobilePosition(mobilePosition); + } + storager.updateChannelPosition(deviceChannel); + //鍥炲 200 OK + responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK); - // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� - JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); - jsonObject.put("serial", deviceChannel.getDeviceId()); - jsonObject.put("code", deviceChannel.getChannelId()); - jsonObject.put("longitude", mobilePosition.getLongitude()); - jsonObject.put("latitude", mobilePosition.getLatitude()); - jsonObject.put("altitude", mobilePosition.getAltitude()); - jsonObject.put("direction", mobilePosition.getDirection()); - jsonObject.put("speed", mobilePosition.getSpeed()); - redisCatchStorage.sendMobilePositionMsg(jsonObject); + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("serial", deviceChannel.getDeviceId()); + jsonObject.put("code", deviceChannel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + + } + taskQueueHandlerRun = false; + }); } + + } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index dedc7c1..3c5644b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; @@ -58,6 +59,9 @@ @Autowired private GbStreamMapper gbStreamMapper; + + @Autowired + private UserSetting userSetting; @@ -241,7 +245,7 @@ if (subscribe != null) { // TODO 鏆傛椂鍙鐞嗚棰戞祦鐨勫洖澶�,鍚庣画澧炲姞瀵瑰浗鏍囪澶囩殑鏀寔 - List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId()); + List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId(), userSetting.isUsePushingAsStatus()); if (gbStreams.size() == 0) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index c0b9e95..6662738 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -131,23 +131,6 @@ StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); - result.onCompletion(()->{ - // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙� - taskExecutor.execute(()->{ - // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉� - String path = "snap"; - String fileName = deviceId + "_" + channelId + ".jpg"; - WVPResult wvpResult = (WVPResult)result.getResult(); - if (Objects.requireNonNull(wvpResult).getCode() == 0) { - StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData(); - MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); - String streamUrl = streamInfoForSuccess.getFmp4(); - // 璇锋眰鎴浘 - logger.info("[璇锋眰鎴浘]: " + fileName); - zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName); - } - }); - }); if (streamInfo != null) { String streamId = streamInfo.getStream(); if (streamId == null) { @@ -209,6 +192,21 @@ SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{ + // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙� + taskExecutor.execute(()->{ + // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉� + String path = "snap"; + String fileName = deviceId + "_" + channelId + ".jpg"; + WVPResult wvpResult = (WVPResult)result.getResult(); + if (Objects.requireNonNull(wvpResult).getCode() == 0) { + StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData(); + MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); + String streamUrl = streamInfoForSuccess.getFmp4(); + // 璇锋眰鎴浘 + logger.info("[璇锋眰鎴浘]: " + fileName); + zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName); + } + }); if (hookEvent != null) { hookEvent.response(mediaServerItem, response); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java deleted file mode 100644 index 1634234..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.genersoft.iot.vmp.service.impl; - -import com.alibaba.fastjson.JSON; -import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.service.IPlatformChannelService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.utils.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import java.util.List; - - -@Component -public class RedisAlarmMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisAlarmMsgListener.class); - - @Autowired - private ISIPCommander commander; - - @Autowired - private ISIPCommanderForPlatform commanderForPlatform; - - @Autowired - private IVideoManagerStorage storage; - - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody())); - AlarmChannelMessage alarmChannelMessage = JSON.parseObject(message.getBody(), AlarmChannelMessage.class); - if (alarmChannelMessage == null) { - logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触"); - return; - } - String gbId = alarmChannelMessage.getGbId(); - - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setCreateTime(DateUtil.getNow()); - deviceAlarm.setChannelId(gbId); - deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); - deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); - deviceAlarm.setAlarmPriority("1"); - deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601()); - deviceAlarm.setAlarmType("1"); - deviceAlarm.setLongitude(0); - deviceAlarm.setLatitude(0); - - if (ObjectUtils.isEmpty(gbId)) { - // 鍙戦�佺粰鎵�鏈夌殑涓婄骇 - List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true); - if (parentPlatforms.size() > 0) { - for (ParentPlatform parentPlatform : parentPlatforms) { - commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); - } - } - }else { - Device device = storage.queryVideoDevice(gbId); - ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId); - if (device != null && platform == null) { - commander.sendAlarmMessage(device, deviceAlarm); - }else if (device == null && platform != null){ - commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); - }else { - logger.warn("鏃犳硶纭畾" + gbId + "鏄钩鍙拌繕鏄澶�"); - } - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java deleted file mode 100644 index 56c9ff3..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.genersoft.iot.vmp.service.impl; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; -import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; -import com.genersoft.iot.vmp.utils.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋� - * @author lin - */ -@Component -public class RedisPushStreamResponseListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); - - private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); - - public interface PushStreamResponseEvent{ - void run(MessageForPushChannelResponse response); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - // - logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody())); - MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class); - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ - logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�"); - return; - } - // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅 - if (responseEvents.get(response.getApp() + response.getStream()) != null) { - responseEvents.get(response.getApp() + response.getStream()).run(response); - } - } - - public void addEvent(String app, String stream, PushStreamResponseEvent callback) { - responseEvents.put(app + stream, callback); - } - - public void removeEvent(String app, String stream) { - responseEvents.remove(app + stream); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java deleted file mode 100644 index bedbf44..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.genersoft.iot.vmp.service.impl; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; -import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.utils.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.*; - -/** - * @Auther: JiangFeng - * @Date: 2022/8/16 11:32 - * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡 - */ -@Component -public class RedisPushStreamStatusListMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class); - @Resource - private IMediaServerService mediaServerService; - - @Resource - private IStreamPushService streamPushService; - @Resource - private IGbStreamService gbStreamService; - - @Override - public void onMessage(Message message, byte[] bytes) { - // - logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody())); - List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class); - //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀� - List<String> allAppAndStream = streamPushService.getAllAppAndStream(); - - /** - * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛� - */ - List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); - List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); - for (StreamPushItem streamPushItem : streamPushItems) { - String app = streamPushItem.getApp(); - String stream = streamPushItem.getStream(); - boolean contains = allAppAndStream.contains(app + stream); - //涓嶅瓨鍦ㄥ氨娣诲姞 - if (!contains) { - streamPushItem.setStreamType("push"); - streamPushItem.setCreateTime(DateUtil.getNow()); - streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); - streamPushItem.setOriginType(2); - streamPushItem.setOriginTypeStr("rtsp_push"); - streamPushItem.setTotalReaderCount("0"); - streamPushItemForSave.add(streamPushItem); - } else { - //瀛樺湪灏卞彧淇敼 name鍜実bId - streamPushItemForUpdate.add(streamPushItem); - } - } - if (streamPushItemForSave.size() > 0) { - - logger.info("娣诲姞{}鏉�",streamPushItemForSave.size()); - logger.info(JSONObject.toJSONString(streamPushItemForSave)); - streamPushService.batchAdd(streamPushItemForSave); - - } - if(streamPushItemForUpdate.size()>0){ - logger.info("淇敼{}鏉�",streamPushItemForUpdate.size()); - logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); - gbStreamService.updateGbIdOrName(streamPushItemForUpdate); - } - - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java deleted file mode 100644 index 118a227..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.genersoft.iot.vmp.service.impl; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.conf.UserSetting; - -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.stereotype.Component; - - -/** - * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡 - * @author lin - */ -@Component -public class RedisStreamMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); - - @Autowired - private UserSetting userSetting; - - @Autowired - private ZLMMediaListManager zlmMediaListManager; - - @Override - public void onMessage(Message message, byte[] bytes) { - - JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class); - if (steamMsgJson == null) { - logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触"); - return; - } - String serverId = steamMsgJson.getString("serverId"); - - if (userSetting.getServerId().equals(serverId)) { - // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲 - return; - } - logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody())); - String app = steamMsgJson.getString("app"); - String stream = steamMsgJson.getString("stream"); - boolean register = steamMsgJson.getBoolean("register"); - String mediaServerId = steamMsgJson.getString("mediaServerId"); - MediaItem mediaItem = new MediaItem(); - mediaItem.setSeverId(serverId); - mediaItem.setApp(app); - mediaItem.setStream(stream); - mediaItem.setRegist(register); - mediaItem.setMediaServerId(mediaServerId); - mediaItem.setCreateStamp(System.currentTimeMillis()/1000); - mediaItem.setAliveSecond(0L); - mediaItem.setTotalReaderCount("0"); - mediaItem.setOriginType(0); - mediaItem.setOriginTypeStr("0"); - mediaItem.setOriginTypeStr("unknown"); - if (register) { - zlmMediaListManager.addPush(mediaItem); - }else { - zlmMediaListManager.removeMedia(app, stream); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java new file mode 100644 index 0000000..84f5ef7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -0,0 +1,100 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson.JSON; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + + +@Component +public class RedisAlarmMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisAlarmMsgListener.class); + + @Autowired + private ISIPCommander commander; + + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + + @Autowired + private IVideoManagerStorage storage; + + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody())); + + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + + AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); + if (alarmChannelMessage == null) { + logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触"); + return; + } + String gbId = alarmChannelMessage.getGbId(); + + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setCreateTime(DateUtil.getNow()); + deviceAlarm.setChannelId(gbId); + deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); + deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); + deviceAlarm.setAlarmPriority("1"); + deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601()); + deviceAlarm.setAlarmType("1"); + deviceAlarm.setLongitude(0); + deviceAlarm.setLatitude(0); + + if (ObjectUtils.isEmpty(gbId)) { + // 鍙戦�佺粰鎵�鏈夌殑涓婄骇 + List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true); + if (parentPlatforms.size() > 0) { + for (ParentPlatform parentPlatform : parentPlatforms) { + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); + } + } + }else { + Device device = storage.queryVideoDevice(gbId); + ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId); + if (device != null && platform == null) { + commander.sendAlarmMessage(device, deviceAlarm); + }else if (device == null && platform != null){ + commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); + }else { + logger.warn("鏃犳硶纭畾" + gbId + "鏄钩鍙拌繕鏄澶�"); + } + } + } + taskQueueHandlerRun = false; + }); + } + + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java similarity index 67% rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index e327d0f..e6ba9b6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.service.impl; +package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; @@ -19,8 +19,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.text.ParseException; @@ -28,6 +30,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -85,6 +88,14 @@ @Autowired private ZlmHttpHookSubscribe subscribe; + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + public interface PlayMsgCallback{ void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException; @@ -100,94 +111,107 @@ @Override public void onMessage(Message message, byte[] bytes) { - JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class); - WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); - if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { - return; - } - if (WvpRedisMsg.isRequest(wvpRedisMsg)) { - logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(message.getBody())); - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: - RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); - requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; - requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - default: - break; - } + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); + WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { + return; + } + if (WvpRedisMsg.isRequest(wvpRedisMsg)) { + logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody())); - }else { - logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(message.getBody())); - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: + RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + default: + break; + } - WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + }else { + logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody())); + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: - String key = wvpRedisMsg.getSerial(); - switch (content.getCode()) { - case 0: - ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); - PlayMsgCallback playMsgCallback = callbacks.get(key); - if (playMsgCallback != null) { - callbacksForError.remove(key); - try { - playMsgCallback.handler(responseSendItemMsg); - } catch (ParseException e) { - throw new RuntimeException(e); + WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + + String key = wvpRedisMsg.getSerial(); + switch (content.getCode()) { + case 0: + ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); + PlayMsgCallback playMsgCallback = callbacks.get(key); + if (playMsgCallback != null) { + callbacksForError.remove(key); + try { + playMsgCallback.handler(responseSendItemMsg); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(key); + if (errorCallback != null) { + callbacks.remove(key); + errorCallback.handler(content); + } + break; + default: + break; } - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(key); - if (errorCallback != null) { - callbacks.remove(key); - errorCallback.handler(content); - } - break; - default: - break; + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + String serial = wvpRedisMsg.getSerial(); + switch (wvpResult.getCode()) { + case 0: + JSONObject jsonObject = (JSONObject)wvpResult.getData(); + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); + if (playMsgCallback != null) { + callbacksForError.remove(serial); + playMsgCallback.handler(jsonObject); + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); + if (errorCallback != null) { + callbacks.remove(serial); + errorCallback.handler(wvpResult); + } + break; + default: + break; + } + break; + default: + break; + } } - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); - String serial = wvpRedisMsg.getSerial(); - switch (wvpResult.getCode()) { - case 0: - JSONObject jsonObject = (JSONObject)wvpResult.getData(); - PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); - if (playMsgCallback != null) { - callbacksForError.remove(serial); - playMsgCallback.handler(jsonObject); - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); - if (errorCallback != null) { - callbacks.remove(serial); - errorCallback.handler(wvpResult); - } - break; - default: - break; - } - break; - default: - break; - } + } + taskQueueHandlerRun = false; + }); } + + } /** diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java similarity index 95% rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index 4e94d68..bb2f4ad 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -1,7 +1,6 @@ -package com.genersoft.iot.vmp.service.impl; +package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; -import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java new file mode 100644 index 0000000..13e6874 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java @@ -0,0 +1,75 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson.JSON; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋� + * @author lin + */ +@Component +public class RedisPushStreamResponseListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); + + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + + private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); + + public interface PushStreamResponseEvent{ + void run(MessageForPushChannelResponse response); + } + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody())); + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ + logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�"); + return; + } + // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅 + if (responseEvents.get(response.getApp() + response.getStream()) != null) { + responseEvents.get(response.getApp() + response.getStream()).run(response); + } + } + taskQueueHandlerRun = false; + }); + } + } + + public void addEvent(String app, String stream, PushStreamResponseEvent callback) { + responseEvents.put(app + stream, callback); + } + + public void removeEvent(String app, String stream) { + responseEvents.remove(app + stream); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java new file mode 100644 index 0000000..3925836 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -0,0 +1,103 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IGbStreamService; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * @Auther: JiangFeng + * @Date: 2022/8/16 11:32 + * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡 + */ +@Component +public class RedisPushStreamStatusListMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class); + @Resource + private IMediaServerService mediaServerService; + + @Resource + private IStreamPushService streamPushService; + @Resource + private IGbStreamService gbStreamService; + + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody())); + + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); + //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀� + List<String> allAppAndStream = streamPushService.getAllAppAndStream(); + + /** + * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛� + */ + List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); + List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); + for (StreamPushItem streamPushItem : streamPushItems) { + String app = streamPushItem.getApp(); + String stream = streamPushItem.getStream(); + boolean contains = allAppAndStream.contains(app + stream); + //涓嶅瓨鍦ㄥ氨娣诲姞 + if (!contains) { + streamPushItem.setStreamType("push"); + streamPushItem.setCreateTime(DateUtil.getNow()); + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); + streamPushItem.setOriginType(2); + streamPushItem.setOriginTypeStr("rtsp_push"); + streamPushItem.setTotalReaderCount("0"); + streamPushItemForSave.add(streamPushItem); + } else { + //瀛樺湪灏卞彧淇敼 name鍜実bId + streamPushItemForUpdate.add(streamPushItem); + } + } + if (streamPushItemForSave.size() > 0) { + + logger.info("娣诲姞{}鏉�",streamPushItemForSave.size()); + logger.info(JSONObject.toJSONString(streamPushItemForSave)); + streamPushService.batchAdd(streamPushItemForSave); + + } + if(streamPushItemForUpdate.size()>0){ + logger.info("淇敼{}鏉�",streamPushItemForUpdate.size()); + logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); + gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + } + } + taskQueueHandlerRun = false; + }); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java similarity index 82% rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java index 50e894a..4b1c2d5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java @@ -1,24 +1,11 @@ -package com.genersoft.iot.vmp.service.impl; +package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto; -import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -30,8 +17,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -65,7 +50,6 @@ @Override public void onMessage(Message message, byte[] bytes) { - // TODO 澧炲姞闃熷垪 logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鐘舵�佸彉鍖朷锛� {}", new String(message.getBody())); taskQueue.offer(message); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java new file mode 100644 index 0000000..1897b6f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java @@ -0,0 +1,91 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; + +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡 + * @author lin + */ +@Component +public class RedisStreamMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZLMMediaListManager zlmMediaListManager; + + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] bytes) { + + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); + if (steamMsgJson == null) { + logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触"); + return; + } + String serverId = steamMsgJson.getString("serverId"); + + if (userSetting.getServerId().equals(serverId)) { + // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲 + return; + } + logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody())); + String app = steamMsgJson.getString("app"); + String stream = steamMsgJson.getString("stream"); + boolean register = steamMsgJson.getBoolean("register"); + String mediaServerId = steamMsgJson.getString("mediaServerId"); + MediaItem mediaItem = new MediaItem(); + mediaItem.setSeverId(serverId); + mediaItem.setApp(app); + mediaItem.setStream(stream); + mediaItem.setRegist(register); + mediaItem.setMediaServerId(mediaServerId); + mediaItem.setCreateStamp(System.currentTimeMillis()/1000); + mediaItem.setAliveSecond(0L); + mediaItem.setTotalReaderCount("0"); + mediaItem.setOriginType(0); + mediaItem.setOriginTypeStr("0"); + mediaItem.setOriginTypeStr("unknown"); + if (register) { + zlmMediaListManager.addPush(mediaItem); + }else { + zlmMediaListManager.removeMedia(app, stream); + } + } + taskQueueHandlerRun = false; + }); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 5e74d99..8a5d455 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -75,18 +75,23 @@ "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'") GbStream queryStreamInPlatform(String platformId, String gbId); - @Select("select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture, st.status, gt.longitude, gt.latitude, pc.id as parentId," + + @Select("<script> "+ + "select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture, st.status, gt.longitude, gt.latitude, pc.id as parentId," + " '1' as registerWay, pc.civilCode, 'live' as model, 'wvp-pro' as owner, '0' as parental,'0' as secrecy" + " from gb_stream gt " + " left join (" + - " select sp.status, sp.app, sp.stream from stream_push sp" + + " select " + + " <if test='usPushingAsStatus != true'> sp.status as status, </if>" + + " <if test='usPushingAsStatus == true'> sp.pushIng as status, </if>" + + "sp.app, sp.stream from stream_push sp" + " union all" + " select spxy.status, spxy.app, spxy.stream from stream_proxy spxy" + " ) st on st.app = gt.app and st.stream = gt.stream" + " left join platform_gb_stream pgs on gt.gbStreamId = pgs.gbStreamId" + " left join platform_catalog pc on pgs.catalogId = pc.id and pgs.platformId = pc.platformId" + - " where pgs.platformId=#{platformId}") - List<DeviceChannel> queryGbStreamListInPlatform(String platformId); + " where pgs.platformId=#{platformId}" + + "</script>") + List<DeviceChannel> queryGbStreamListInPlatform(String platformId, boolean usPushingAsStatus); @Select("SELECT gs.* FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 702b5be..f8a74fe 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; @@ -82,6 +83,9 @@ @Autowired private GbStreamMapper gbStreamMapper; + + @Autowired + private UserSetting userSetting; @Autowired private PlatformCatalogMapper catalogMapper; @@ -614,7 +618,7 @@ */ @Override public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) { - return gbStreamMapper.queryGbStreamListInPlatform(platformId); + return gbStreamMapper.queryGbStreamListInPlatform(platformId, userSetting.isUsePushingAsStatus()); } /** diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index 3569de2..ed025b9 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -188,6 +188,8 @@ record-sip: true # 鏄惁灏嗘棩蹇楀瓨鍌ㄨ繘鏁版嵁搴� logInDatebase: true + # 浣跨敤鎺ㄦ祦鐘舵�佷綔涓烘帹娴侀�氶亾鐘舵�� + use-pushing-as-status: true # 鍏抽棴鍦ㄧ嚎鏂囨。锛堢敓浜х幆澧冨缓璁叧闂級 springdoc: -- Gitblit v1.8.0