From 941b9a8374c2e148795d0a41c164026aca3345d2 Mon Sep 17 00:00:00 2001 From: xiaoQQya <46475319+xiaoQQya@users.noreply.github.com> Date: 星期三, 01 十一月 2023 13:44:14 +0800 Subject: [PATCH] Merge branch '648540858:wvp-28181-2.0' into develop --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 3 src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java | 8 +- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java | 1 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java | 18 ++- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java | 15 +++ sql/2.6.9更新.sql | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 109 --------------------------- src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 43 ++++++++++ 8 files changed, 78 insertions(+), 121 deletions(-) diff --git "a/sql/2.6.9\346\233\264\346\226\260.sql" "b/sql/2.6.9\346\233\264\346\226\260.sql" index 769004d..f8f44d9 100644 --- "a/sql/2.6.9\346\233\264\346\226\260.sql" +++ "b/sql/2.6.9\346\233\264\346\226\260.sql" @@ -5,4 +5,4 @@ add auto_push_channel bool default false alter table wvp_stream_proxy - add stream_key varying(255) + add stream_key character varying(255) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 39dff93..2ffbfe4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -89,17 +89,17 @@ ResponseEvent event = (ResponseEvent) eventResult.event; if (event.getResponse().getRawContent() != null) { // 鎴愬姛 - logger.info("[鍙栨秷鐩綍璁㈤槄璁㈤槄]鎴愬姛锛� {}", device.getDeviceId()); + logger.info("[鍙栨秷鐩綍璁㈤槄]鎴愬姛锛� {}", device.getDeviceId()); }else { // 鎴愬姛 - logger.info("[鍙栨秷鐩綍璁㈤槄璁㈤槄]鎴愬姛锛� {}", device.getDeviceId()); + logger.info("[鍙栨秷鐩綍璁㈤槄]鎴愬姛锛� {}", device.getDeviceId()); } },eventResult -> { // 澶辫触 - logger.warn("[鍙栨秷鐩綍璁㈤槄璁㈤槄]澶辫触锛屼俊浠ゅ彂閫佸け璐ワ細 {}-{} ", device.getDeviceId(), eventResult.msg); + logger.warn("[鍙栨秷鐩綍璁㈤槄]澶辫触锛屼俊浠ゅ彂閫佸け璐ワ細 {}-{} ", device.getDeviceId(), eventResult.msg); }); } catch (InvalidArgumentException | SipException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍙栨秷鐩綍璁㈤槄璁㈤槄: {}", e.getMessage()); + logger.error("[鍛戒护鍙戦�佸け璐 鍙栨秷鐩綍璁㈤槄: {}", e.getMessage()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index dbe49d5..d35c6a6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -132,7 +132,6 @@ if (CmdType.CATALOG.equals(cmd)) { logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(take.getEvt()); notifyRequestForCatalogProcessor.process(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("鎺ユ敹鍒癆larm閫氱煡"); @@ -365,114 +364,6 @@ // 鍥炲200 OK if (redisCatchStorage.deviceIsOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - } - - /*** - * 澶勭悊catalog璁惧鐩綍鍒楄〃Notify - * - * @param evt - */ - private void processNotifyCatalogList(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 閬嶅巻DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf); - if (channel == null) { - logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent())); - continue; - } - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 涓婄嚎 - logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOnline(deviceId, channel.getChannelId()); - break; - case CatalogEvent.OFF : - // 绂荤嚎 - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - }else { - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - } - break; - case CatalogEvent.VLOST: - // 瑙嗛涓㈠け - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - }else { - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - } - break; - case CatalogEvent.DEFECT: - // 鏁呴殰 - break; - case CatalogEvent.ADD: - // 澧炲姞 - logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deviceChannelService.updateChannel(deviceId, channel); - break; - case CatalogEvent.DEL: - // 鍒犻櫎 - logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.delChannel(deviceId, channel.getChannelId()); - break; - case CatalogEvent.UPDATE: - // 鏇存柊 - logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deviceChannelService.updateChannel(deviceId, channel); - break; - default: - logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); - - } - // 杞彂鍙樺寲淇℃伅 - eventPublisher.catalogEventPublish(null, channel, event); - - } } } catch (DocumentException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 84e9e7e..52bc902 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -215,6 +215,21 @@ } } + public JSONObject isMediaOnline(MediaServerItem mediaServerItem, String app, String stream, String schema){ + Map<String, Object> param = new HashMap<>(); + if (app != null) { + param.put("app",app); + } + if (stream != null) { + param.put("stream",stream); + } + if (schema != null) { + param.put("schema",schema); + } + param.put("vhost","__defaultVhost__"); + return sendPost(mediaServerItem, "isMediaOnline", param, null); + } + public JSONObject getMediaList(MediaServerItem mediaServerItem, String app, String stream, String schema, RequestCallback callback){ Map<String, Object> param = new HashMap<>(); if (app != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index 6e59402..4a781f3 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IMediaServerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 0d99ecc..79f2039 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -520,16 +520,18 @@ // 鐩綍璁㈤槄鐩稿叧鐨勪俊鎭� - if (device.getSubscribeCycleForCatalog() > 0) { - if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { + if (device.getSubscribeCycleForCatalog() > 0) { + // 鑻ュ凡寮�鍚闃咃紝浣嗚闃呭懆鏈熶笉鍚岋紝鍒欏厛鍙栨秷 + if (deviceInStore.getSubscribeCycleForCatalog() != 0) { + removeCatalogSubscribe(deviceInStore); + } // 寮�鍚闃� - addCatalogSubscribe(deviceInStore); - } - }else if (device.getSubscribeCycleForCatalog() == 0) { - if (deviceInStore.getSubscribeCycleForCatalog() != 0) { deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + addCatalogSubscribe(deviceInStore); + }else if (device.getSubscribeCycleForCatalog() == 0) { // 鍙栨秷璁㈤槄 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); removeCatalogSubscribe(deviceInStore); } } @@ -544,6 +546,8 @@ } }else if (device.getSubscribeCycleForMobilePosition() == 0) { if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { + deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); // 鍙栨秷璁㈤槄 removeMobilePositionSubscribe(deviceInStore); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 7fbe769..eac543a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -35,15 +35,19 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; /** * 瑙嗛浠g悊涓氬姟 @@ -560,4 +564,43 @@ return new ResourceBaseInfo(total, online); } + + + @Scheduled(cron = "* 0/10 * * * ?") + public void asyncCheckStreamProxyStatus() { + + List<MediaServerItem> all = mediaServerService.getAllOnline(); + + if (CollectionUtils.isEmpty(all)){ + return; + } + + Map<String, MediaServerItem> serverItemMap = all.stream().collect(Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1)); + + List<StreamProxyItem> list = videoManagerStorager.getStreamProxyListForEnable(true); + + if (CollectionUtils.isEmpty(list)){ + return; + } + + for (StreamProxyItem streamProxyItem : list) { + + MediaServerItem mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId()); + + // TODO 鏀寔鍏朵粬 schema + JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp"); + + if (mediaInfo == null){ + streamProxyItem.setStatus(false); + } else { + if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) { + streamProxyItem.setStatus(true); + } else { + streamProxyItem.setStatus(false); + } + } + + updateStreamProxy(streamProxyItem); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index bc34162..32e9bdb 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -506,6 +506,9 @@ stream.setUpdateTime(DateUtil.getNow()); stream.setCreateTime(DateUtil.getNow()); stream.setServerId(userSetting.getServerId()); + stream.setMediaServerId(mediaConfig.getId()); + stream.setSelf(true); + stream.setPushIng(true); // 鏀惧湪浜嬪姟鍐呮墽琛� boolean result = false; -- Gitblit v1.8.0