648540858
2023-10-31 6c13317faf11cb72d659bdedad555ac896bb8922
Merge branch 'wvp-28181-2.0' into main-dev
9个文件已修改
201 ■■■■■ 已修改文件
sql/2.6.9更新.sql 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 109 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sql/2.6.9¸üÐÂ.sql
@@ -5,4 +5,4 @@
    add auto_push_channel bool default false
alter table wvp_stream_proxy
    add stream_key varying(255)
    add stream_key character varying(255)
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
@@ -89,17 +89,17 @@
                ResponseEvent event = (ResponseEvent) eventResult.event;
                if (event.getResponse().getRawContent() != null) {
                    // æˆåŠŸ
                    logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
                    logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
                }else {
                    // æˆåŠŸ
                    logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
                    logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
                }
            },eventResult -> {
                // å¤±è´¥
                logger.warn("[取消目录订阅订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
                logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
            });
        } catch (InvalidArgumentException | SipException | ParseException e) {
            logger.error("[命令发送失败] å–消目录订阅订阅: {}", e.getMessage());
            logger.error("[命令发送失败] å–消目录订阅: {}", e.getMessage());
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -132,7 +132,6 @@
                        if (CmdType.CATALOG.equals(cmd)) {
                            logger.info("接收到Catalog通知");
                            processNotifyCatalogList(take.getEvt());
                            notifyRequestForCatalogProcessor.process(take.getEvt());
                        } else if (CmdType.ALARM.equals(cmd)) {
                            logger.info("接收到Alarm通知");
@@ -365,114 +364,6 @@
            // å›žå¤200 OK
            if (redisCatchStorage.deviceIsOnline(deviceId)) {
                publisher.deviceAlarmEventPublish(deviceAlarm);
            }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
        }
    }
    /***
     * å¤„理catalog设备目录列表Notify
     *
     * @param evt
     */
    private void processNotifyCatalogList(RequestEvent evt) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null || !device.isOnLine()) {
                logger.warn("[收到目录订阅]:{}, ä½†æ˜¯è®¾å¤‡å·²ç»ç¦»çº¿", (device != null ? device.getDeviceId():"" ));
                return;
            }
            Element rootElement = getRootElement(evt, device.getCharset());
            if (rootElement == null) {
                logger.warn("[ æ”¶åˆ°ç›®å½•订阅 ] content cannot be null, {}", evt.getRequest());
                return;
            }
            Element deviceListElement = rootElement.element("DeviceList");
            if (deviceListElement == null) {
                return;
            }
            Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
            if (deviceListIterator != null) {
                // éåކDeviceList
                while (deviceListIterator.hasNext()) {
                    Element itemDevice = deviceListIterator.next();
                    Element channelDeviceElement = itemDevice.element("DeviceID");
                    if (channelDeviceElement == null) {
                        continue;
                    }
                    Element eventElement = itemDevice.element("Event");
                    String event;
                    if (eventElement == null) {
                        logger.warn("[收到目录订阅]:{}, ä½†æ˜¯Event为空, è®¾ä¸ºé»˜è®¤å€¼ ADD", (device != null ? device.getDeviceId():"" ));
                        event = CatalogEvent.ADD;
                    }else {
                        event = eventElement.getText().toUpperCase();
                    }
                    DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf);
                    if (channel == null) {
                        logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
                        continue;
                    }
                    if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
                        channel.setParentId(null);
                    }
                    channel.setDeviceId(device.getDeviceId());
                    logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                    switch (event) {
                        case CatalogEvent.ON:
                            // ä¸Šçº¿
                            logger.info("[收到通道上线通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            storager.deviceChannelOnline(deviceId, channel.getChannelId());
                            break;
                        case CatalogEvent.OFF :
                            // ç¦»çº¿
                            logger.info("[收到通道离线通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                storager.deviceChannelOffline(deviceId, channel.getChannelId());
                            }else {
                                logger.info("[收到通道离线通知] ä½†æ˜¯å¹³å°å·²é…ç½®æ‹’绝此消息,来自设备: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            }
                            break;
                        case CatalogEvent.VLOST:
                            // è§†é¢‘丢失
                            logger.info("[收到通道视频丢失通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                storager.deviceChannelOffline(deviceId, channel.getChannelId());
                            }else {
                                logger.info("[收到通道视频丢失通知] ä½†æ˜¯å¹³å°å·²é…ç½®æ‹’绝此消息,来自设备: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            }
                            break;
                        case CatalogEvent.DEFECT:
                            // æ•…éšœ
                            break;
                        case CatalogEvent.ADD:
                            // å¢žåŠ 
                            logger.info("[收到增加通道通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            deviceChannelService.updateChannel(deviceId, channel);
                            break;
                        case CatalogEvent.DEL:
                            // åˆ é™¤
                            logger.info("[收到删除通道通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            storager.delChannel(deviceId, channel.getChannelId());
                            break;
                        case CatalogEvent.UPDATE:
                            // æ›´æ–°
                            logger.info("[收到更新通道通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            deviceChannelService.updateChannel(deviceId, channel);
                            break;
                        default:
                            logger.warn("[ NotifyCatalog ] event not found ï¼š {}", event );
                    }
                    // è½¬å‘变化信息
                    eventPublisher.catalogEventPublish(null, channel, event);
                }
            }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -218,6 +218,21 @@
        }
    }
    public JSONObject isMediaOnline(MediaServerItem mediaServerItem, String app, String stream, String schema){
        Map<String, Object> param = new HashMap<>();
        if (app != null) {
            param.put("app",app);
        }
        if (stream != null) {
            param.put("stream",stream);
        }
        if (schema != null) {
            param.put("schema",schema);
        }
        param.put("vhost","__defaultVhost__");
        return sendPost(mediaServerItem, "isMediaOnline", param, null);
    }
    public JSONObject getMediaList(MediaServerItem mediaServerItem, String app, String stream, String schema, RequestCallback callback){
        Map<String, Object> param = new HashMap<>();
        if (app != null) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -9,6 +9,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -545,16 +545,18 @@
        //  ç›®å½•订阅相关的信息
        if (device.getSubscribeCycleForCatalog() > 0) {
            if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
        if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
            if (device.getSubscribeCycleForCatalog() > 0) {
                // è‹¥å·²å¼€å¯è®¢é˜…,但订阅周期不同,则先取消
                if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
                    removeCatalogSubscribe(deviceInStore);
                }
                // å¼€å¯è®¢é˜…
                addCatalogSubscribe(deviceInStore);
            }
        }else if (device.getSubscribeCycleForCatalog() == 0) {
            if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
                addCatalogSubscribe(deviceInStore);
            }else if (device.getSubscribeCycleForCatalog() == 0) {
                // å–消订阅
                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
                removeCatalogSubscribe(deviceInStore);
            }
        }
@@ -569,6 +571,8 @@
            }
        }else if (device.getSubscribeCycleForMobilePosition() == 0) {
            if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
                deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
                // å–消订阅
                removeMobilePositionSubscribe(deviceInStore);
            }
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
@@ -257,7 +257,7 @@
                ":" + inviteInfo.getDeviceId() +
                ":" + inviteInfo.getChannelId() +
                ":" + inviteInfo.getStream() +
                ":" + inviteInfo.getSsrcInfo().getSsrc();
                ":" + ssrc;
        if (inviteInfoInDb.getSsrcInfo() != null) {
            inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
        }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -35,15 +35,19 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
 * è§†é¢‘代理业务
@@ -554,4 +558,43 @@
        return new ResourceBaseInfo(total, online);
    }
    @Scheduled(cron = "* 0/10 * * * ?")
    public void asyncCheckStreamProxyStatus() {
        List<MediaServerItem> all = mediaServerService.getAllOnline();
        if (CollectionUtils.isEmpty(all)){
            return;
        }
        Map<String, MediaServerItem> serverItemMap = all.stream().collect(Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1));
        List<StreamProxyItem> list = videoManagerStorager.getStreamProxyListForEnable(true);
        if (CollectionUtils.isEmpty(list)){
            return;
        }
        for (StreamProxyItem streamProxyItem : list) {
            MediaServerItem mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
            // TODO æ”¯æŒå…¶ä»– schema
            JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp");
            if (mediaInfo == null){
                streamProxyItem.setStatus(false);
            } else {
                if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {
                    streamProxyItem.setStatus(true);
                } else {
                    streamProxyItem.setStatus(false);
                }
            }
            updateStreamProxy(streamProxyItem);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -506,6 +506,9 @@
        stream.setUpdateTime(DateUtil.getNow());
        stream.setCreateTime(DateUtil.getNow());
        stream.setServerId(userSetting.getServerId());
        stream.setMediaServerId(mediaConfig.getId());
        stream.setSelf(true);
        stream.setPushIng(true);
        // æ”¾åœ¨äº‹åŠ¡å†…æ‰§è¡Œ
        boolean result = false;