648540858
2021-12-13 bc0319b3f338412aa18f73bd749057e9ea3a7125
将device信息写入redis以提高sip处理速度
12个文件已修改
121 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -318,7 +318,7 @@
            } else {
                // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
                Device device = storager.queryVideoDevice(requesterId);
                Device device = redisCatchStorage.getDevice(requesterId);
                if (device != null) {
                    logger.info("收到设备" + requesterId + "的语音广播Invite请求");
                    responseAck(evt, Response.TRYING);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -112,7 +112,7 @@
            MobilePosition mobilePosition = new MobilePosition();
            Element deviceIdElement = rootElement.element("DeviceID");
            String deviceId = deviceIdElement.getTextTrim().toString();
            Device device = storager.queryVideoDevice(deviceId);
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device != null) {
                if (!StringUtils.isEmpty(device.getName())) {
                    mobilePosition.setDeviceName(device.getName());
@@ -168,7 +168,7 @@
            Element deviceIdElement = rootElement.element("DeviceID");
            String deviceId = deviceIdElement.getText().toString();
            Device device = storager.queryVideoDevice(deviceId);
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
                return;
            }
@@ -235,7 +235,7 @@
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            Element rootElement = getRootElement(evt);
            Device device = storager.queryVideoDevice(deviceId);
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
                return;
            }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
@@ -10,6 +10,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import gov.nist.javax.sip.RequestEventExt;
import gov.nist.javax.sip.address.AddressImpl;
@@ -52,6 +53,9 @@
    private RegisterLogicHandler handler;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
@@ -86,7 +90,7 @@
            AddressImpl address = (AddressImpl) fromHeader.getAddress();
            SipUri uri = (SipUri) address.getURI();
            String deviceId = uri.getUser();
            Device device = storager.queryVideoDevice(deviceId);
            Device device = redisCatchStorage.getDevice(deviceId);
            AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); 
            // 校验密码是否正确
            if (authorhead != null) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java
@@ -6,6 +6,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -38,6 +39,9 @@
    @Autowired
    private IVideoManagerStorager storage;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -53,7 +57,7 @@
        logger.debug("接收到消息:" + evt.getRequest());
        String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
        // 查询设备是否存在
        Device device = storage.queryVideoDevice(deviceId);
        Device device = redisCatchStorage.getDevice(deviceId);
        // 查询上级平台是否存在
        ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId);
        try {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -467,7 +467,7 @@
                if (s.length == 2) {
                    String deviceId = s[0];
                    String channelId = s[1];
                    Device device = storager.queryVideoDevice(deviceId);
                    Device device = redisCatchStorage.getDevice(deviceId);
                    if (device != null) {
                        UUID uuid = UUID.randomUUID();
                        SSRCInfo ssrcInfo;
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -63,4 +63,6 @@
    void zlmServerOffline(String mediaServerId);
    void clean();
    boolean saveToRandomGB();
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -96,7 +96,7 @@
            resultHolder.invokeResult(msg);
            return playResult;
        }
        Device device = storager.queryVideoDevice(deviceId);
        Device device = redisCatchStorage.getDevice(deviceId);
        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
        playResult.setDevice(device);
        // 超时处理
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -255,4 +255,30 @@
    public void clean() {
    }
    @Override
    public boolean saveToRandomGB() {
        List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
        long gbId = 100001;
        for (StreamPushItem streamPushItem : streamPushItems) {
            streamPushItem.setStreamType("push");
            streamPushItem.setStatus(true);
            streamPushItem.setGbId("34020000004111" + gbId);
            gbId ++;
        }
        int  limitCount = 30;
        if (streamPushItems.size() > limitCount) {
            for (int i = 0; i < streamPushItems.size(); i += limitCount) {
                int toIndex = i + limitCount;
                if (i + limitCount > streamPushItems.size()) {
                    toIndex = streamPushItems.size();
                }
                gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
            }
        }else {
            gbStreamMapper.batchAdd(streamPushItems);
        }
        return true;
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -2,6 +2,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
@@ -169,4 +170,15 @@
    ThirdPartyGB queryMemberNoGBId(String queryKey);
    List<StreamInfo> getStreams(String mediaServerId, String pull);
    /**
     * 将device信息写入redis
     * @param device
     */
    void updateDevice(Device device);
    /**
     * 获取Device
     */
    Device getDevice(String deviceId);
}
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
@@ -79,4 +80,17 @@
            "</foreach>" +
            "</script>")
    void batchDel(List<StreamProxyItem> streamProxyItemList);
    @Insert("<script> " +
            "insert into gb_stream " +
            "(app, stream, gbId, name, " +
            "longitude, latitude, streamType, mediaServerId, status)" +
            "values " +
            "<foreach collection='subList' index='index' item='item' separator=','> " +
            "('${item.app}', '${item.stream}', '${item.gbId}', '${item.name}', " +
            "'${item.longitude}', '${item.latitude}', '${item.streamType}', " +
            "'${item.mediaServerId}', ${item.status}) "+
            "</foreach> " +
            "</script>")
    void batchAdd(List<StreamPushItem> subList);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -377,4 +377,16 @@
        }
        return result;
    }
    @Override
    public void updateDevice(Device device) {
        String key = VideoManagerConstants.DEVICE_PREFIX + userSetup.getServerId() + "_" + device.getDeviceId();
        redis.set(key, device);
    }
    @Override
    public Device getDevice(String deviceId) {
        String key = VideoManagerConstants.DEVICE_PREFIX + userSetup.getServerId() + "_" + deviceId;
        return (Device)redis.get(key);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -110,6 +110,7 @@
     */
    @Override
    public synchronized boolean create(Device device) {
        redisCatchStorage.updateDevice(device);
        return deviceMapper.add(device) > 0;
    }
@@ -128,10 +129,13 @@
        Device deviceByDeviceId = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
        if (deviceByDeviceId == null) {
            device.setCreateTime(now);
            redisCatchStorage.updateDevice(device);
            return deviceMapper.add(device) > 0;
        }else {
            redisCatchStorage.updateDevice(device);
            return deviceMapper.update(device) > 0;
        }
    }
@@ -185,11 +189,32 @@
                    }
                }
            }
            int limitCount = 300;
            if (addChannels.size() > 0) {
                deviceChannelMapper.batchAdd(addChannels);
                if (addChannels.size() > limitCount) {
                    for (int i = 0; i < addChannels.size(); i += limitCount) {
                        int toIndex = i + limitCount;
                        if (i + limitCount > addChannels.size()) {
                            toIndex = addChannels.size();
                        }
                        deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex));
                    }
                }else {
                    deviceChannelMapper.batchAdd(addChannels);
                }
            }
            if (updateChannels.size() > 0) {
                deviceChannelMapper.batchUpdate(updateChannels);
                if (updateChannels.size() > limitCount) {
                    for (int i = 0; i < updateChannels.size(); i += limitCount) {
                        int toIndex = i + limitCount;
                        if (i + limitCount > updateChannels.size()) {
                            toIndex = updateChannels.size();
                        }
                        deviceChannelMapper.batchAdd(updateChannels.subList(i, toIndex));
                    }
                }else {
                    deviceChannelMapper.batchUpdate(updateChannels);
                }
            }
        }
    }
@@ -322,6 +347,7 @@
        }
        device.setOnline(1);
        logger.info("更新设备在线: " + deviceId);
        redisCatchStorage.updateDevice(device);
        return deviceMapper.update(device) > 0;
    }
@@ -337,6 +363,7 @@
        Device device = deviceMapper.getDeviceByDeviceId(deviceId);
        if (device == null) return false;
        device.setOnline(0);
        redisCatchStorage.updateDevice(device);
        return deviceMapper.update(device) > 0;
    }