pom.xml
@@ -99,6 +99,13 @@ <version>8.0.22</version> </dependency> <!-- 添加sqlite-jdbc数据库驱动 --> <dependency> <groupId>org.xerial</groupId> <artifactId>sqlite-jdbc</artifactId> <version>3.32.3.2</version> </dependency> <!--Mybatis --> <dependency> <groupId>org.mybatis</groupId> src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -138,16 +138,25 @@ // TODO Auto-generated catch block e.printStackTrace(); } if (evt.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); if (subscribe != null) { subscribe.response(evt); } } } // } else if (status == Response.TRYING) { // trying不会回复 } else if ((status >= 100) && (status < 200)) { // 增加其它无需回复的响应,如101、180等 } else { logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); if (evt.getResponse() != null && sipSubscribe.getSize() > 0 ) { if (evt.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getSubscribe(callIdHeader.getCallId()); SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); if (subscribe != null) { subscribe.response(evt); } src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java
@@ -1,10 +1,16 @@ package com.genersoft.iot.vmp.gb28181.bean; import java.util.Date; import java.util.List; import java.util.Map; public class Device { /** * 数据库存储ID */ private int id; /** * 设备Id @@ -55,14 +61,24 @@ */ private int online; /** * 通道列表 */ // private Map<String,DeviceChannel> channelMap; /** * 注册时间 */ private Long registerTimeMillis; /** * 通道个数 */ private int channelCount; private List<String> channelList; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getDeviceId() { return deviceId; @@ -144,11 +160,11 @@ this.channelCount = channelCount; } public List<String> getChannelList() { return channelList; public Long getRegisterTimeMillis() { return registerTimeMillis; } public void setChannelList(List<String> channelList) { this.channelList = channelList; public void setRegisterTimeMillis(Long registerTimeMillis) { this.registerTimeMillis = registerTimeMillis; } } src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
@@ -2,10 +2,17 @@ public class DeviceChannel { /** * 通道id */ private String channelId; /** * 设备id */ private String deviceId; /** * 通道名 @@ -146,13 +153,15 @@ /** * 是否含有音频 */ private boolean hasAudio; private boolean hasAudio; /** * 是否正在播放 */ private boolean play; public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public void setPTZType(int PTZType) { this.PTZType = PTZType; @@ -385,14 +394,6 @@ public void setHasAudio(boolean hasAudio) { this.hasAudio = hasAudio; } public boolean isPlay() { return play; } public void setPlay(boolean play) { this.play = play; } public String getStreamId() { src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
@@ -17,21 +17,34 @@ private final static Logger logger = LoggerFactory.getLogger(SipSubscribe.class); private Map<String, SipSubscribe.Event> allSubscribes = new ConcurrentHashMap<>(); private Map<String, SipSubscribe.Event> errorSubscribes = new ConcurrentHashMap<>(); private Map<String, SipSubscribe.Event> okSubscribes = new ConcurrentHashMap<>(); public interface Event { void response(ResponseEvent event); } public void addSubscribe(String key, SipSubscribe.Event event) { allSubscribes.put(key, event); public void addErrorSubscribe(String key, SipSubscribe.Event event) { errorSubscribes.put(key, event); } public SipSubscribe.Event getSubscribe(String key) { return allSubscribes.get(key); public void addOkSubscribe(String key, SipSubscribe.Event event) { okSubscribes.put(key, event); } public int getSize(){ return allSubscribes.size(); public SipSubscribe.Event getErrorSubscribe(String key) { return errorSubscribes.get(key); } public SipSubscribe.Event getOkSubscribe(String key) { return okSubscribes.get(key); } public int getErrorSubscribesSize(){ return errorSubscribes.size(); } public int getOkSubscribesSize(){ return okSubscribes.size(); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -4,13 +4,10 @@ import javax.sip.ResponseEvent; import javax.sip.SipProvider; import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.header.Header; import javax.sip.message.Request; import javax.sip.message.Response; import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -59,6 +56,9 @@ @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private EventPublisher publisher; @@ -143,6 +143,7 @@ processor.setOffLineDetector(offLineDetector); processor.setCmder(cmder); processor.setStorager(storager); processor.setRedisCatchStorage(redisCatchStorage); return processor; } else { return new OtherRequestProcessor(); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
@@ -25,6 +25,8 @@ public static final String CALLBACK_CMD_PlAY = "CALLBACK_PLAY"; public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP"; private Map<String, DeferredResult> map = new ConcurrentHashMap<String, DeferredResult>(); public void put(String key, DeferredResult result) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -101,8 +101,9 @@ * * @param ssrc ssrc */ void streamByeCmd(String ssrc, SipSubscribe.Event okEvent); void streamByeCmd(String ssrc); /** * 语音广播 * src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import java.text.ParseException; import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -12,11 +13,13 @@ import javax.sip.message.Request; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +56,9 @@ @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired @Qualifier(value="tcpSipProvider") @@ -229,7 +235,7 @@ Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag"); transmitRequest(device, request, null); transmitRequest(device, request); return true; } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -264,7 +270,7 @@ ptzXml.append("</Control>\r\n"); Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag"); transmitRequest(device, request, null); transmitRequest(device, request); return true; } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -291,7 +297,7 @@ streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); } String streamMode = device.getStreamMode().toUpperCase(); MediaServerConfig mediaInfo = storager.getMediaInfo(); MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); if (mediaInfo == null) { logger.warn("点播时发现ZLM尚未连接..."); return; @@ -344,6 +350,9 @@ } content.append("y="+ssrc+"\r\n");//ssrc // String fromTag = UUID.randomUUID().toString(); // Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, fromTag, null, ssrc); Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "live", null, ssrc); ClientTransaction transaction = transmitRequest(device, request, errorEvent); @@ -372,7 +381,7 @@ public void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event , SipSubscribe.Event errorEvent) { try { MediaServerConfig mediaInfo = storager.getMediaInfo(); MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); String ssrc = streamSession.createPlayBackSsrc(); String streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); // 添加订阅 @@ -457,17 +466,28 @@ e.printStackTrace(); } } /** * 视频流停止 * */ @Override public void streamByeCmd(String streamId) { public void streamByeCmd(String ssrc) { streamByeCmd(ssrc, null); } @Override public void streamByeCmd(String streamId, SipSubscribe.Event okEvent) { try { ClientTransaction transaction = streamSession.get(streamId); // 服务重启后 if (transaction == null) { StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfo != null) { } return; } @@ -475,6 +495,9 @@ if (dialog == null) { return; } Request byeRequest = dialog.createRequest(Request.BYE); SipURI byeURI = (SipURI) byeRequest.getRequestURI(); String vh = transaction.getRequest().getHeader(ViaHeader.NAME).toString(); @@ -491,7 +514,14 @@ } else if("UDP".equals(protocol)) { clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); } CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME); if (okEvent != null) { sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); } dialog.sendRequest(clientTransaction); streamSession.remove(streamId); zlmrtpServerFactory.closeRTPServer(streamId); } catch (TransactionDoesNotExistException e) { @@ -612,7 +642,7 @@ Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaDeviceInfoBranch", "FromDeviceInfoTag", "ToDeviceInfoTag"); transmitRequest(device, request, null); transmitRequest(device, request); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -676,7 +706,7 @@ Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "ViaRecordInfoBranch", "FromRecordInfoTag", null); transmitRequest(device, request, null); transmitRequest(device, request); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); return false; @@ -727,8 +757,16 @@ // TODO Auto-generated method stub return false; } private ClientTransaction transmitRequest(Device device, Request request) throws SipException { return transmitRequest(device, request, null, null); } private ClientTransaction transmitRequest(Device device, Request request, SipSubscribe.Event errorEvent) throws SipException { return transmitRequest(device, request, errorEvent, null); } private ClientTransaction transmitRequest(Device device, Request request, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException { ClientTransaction clientTransaction = null; if("TCP".equals(device.getTransport())) { clientTransaction = tcpSipProvider.getNewClientTransaction(request); @@ -736,10 +774,14 @@ clientTransaction = udpSipProvider.getNewClientTransaction(request); } // 添加订阅 CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); // 添加错误订阅 if (errorEvent != null) { CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); sipSubscribe.addSubscribe(callIdHeader.getCallId(), errorEvent); sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), errorEvent); } // 添加订阅 if (okEvent != null) { sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); } clientTransaction.sendRequest(); @@ -747,6 +789,8 @@ } @Override public void closeRTPServer(Device device, String channelId) { if (rtpEnable) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -10,6 +10,7 @@ import javax.sip.message.Request; import javax.sip.message.Response; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -47,6 +48,8 @@ private SIPCommander cmder; private IVideoManagerStorager storager; private IRedisCatchStorage redisCatchStorage; private EventPublisher publisher; @@ -451,9 +454,9 @@ String NotifyType =XmlUtil.getText(rootElement, "NotifyType"); if (NotifyType.equals("121")){ logger.info("媒体播放完毕,通知关流"); StreamInfo streamInfo = storager.queryPlaybackByDevice(deviceId, "*"); StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*"); if (streamInfo != null) { storager.stopPlayback(streamInfo); redisCatchStorage.stopPlayback(streamInfo); cmder.streamByeCmd(streamInfo.getStreamId()); } } @@ -507,4 +510,11 @@ this.offLineDetector = offLineDetector; } public IRedisCatchStorage getRedisCatchStorage() { return redisCatchStorage; } public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java
@@ -141,9 +141,15 @@ // 下发catelog查询目录 if (registerFlag == 1 && device != null) { logger.info("注册成功! deviceId:" + device.getDeviceId()); boolean exists = storager.exists(device.getDeviceId()); device.setRegisterTimeMillis(System.currentTimeMillis()); storager.updateDevice(device); publisher.onlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_ONLINE_REGISTER); handler.onRegister(device); // 只有第一次注册才更新通道 if (!exists) { handler.onRegister(device); } } else if (registerFlag == 2) { logger.info("注销成功! deviceId:" + device.getDeviceId()); publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHTTPProxyController.java
@@ -2,6 +2,7 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +30,9 @@ @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Value("${media.port}") private int mediaHttpPort; @@ -36,10 +40,10 @@ @RequestMapping(value = "/**/**/**", produces = "application/json;charset=UTF-8") public Object proxy(HttpServletRequest request, HttpServletResponse response){ if (storager.getMediaInfo() == null) { if (redisCatchStorage.getMediaInfo() == null) { return "未接入流媒体"; } MediaServerConfig mediaInfo = storager.getMediaInfo(); MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); String requestURI = String.format("http://%s:%s%s?%s&%s", mediaInfo.getLocalIP(), mediaHttpPort, src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.utils.IpUtil; import com.genersoft.iot.vmp.vmanager.service.IPlayService; @@ -51,6 +52,9 @@ @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -249,13 +253,13 @@ String app = json.getString("app"); String streamId = json.getString("stream"); boolean regist = json.getBoolean("regist"); StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if ("rtp".equals(app) && !regist ) { if (streamInfo!=null){ storager.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo); }else{ streamInfo = storager.queryPlaybackByStreamId(streamId); storager.stopPlayback(streamInfo); streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); redisCatchStorage.stopPlayback(streamInfo); } } @@ -281,12 +285,12 @@ String streamId = json.getString("stream"); cmder.streamByeCmd(streamId); StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfo!=null){ storager.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo); }else{ streamInfo = storager.queryPlaybackByStreamId(streamId); storager.stopPlayback(streamInfo); streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); redisCatchStorage.stopPlayback(streamInfo); } JSONObject ret = new JSONObject(); @@ -311,7 +315,7 @@ if (autoApplyPlay) { String app = json.getString("app"); String streamId = json.getString("stream"); StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if ("rtp".equals(app) && streamId.indexOf("gb_play") > -1 && streamInfo == null) { String[] s = streamId.split("_"); if (s.length == 4) { @@ -355,7 +359,7 @@ // MediaServerConfig mediaServerConfig = mediaServerConfigs.get(0); MediaServerConfig mediaServerConfig = JSON.toJavaObject(json, MediaServerConfig.class); mediaServerConfig.setLocalIP(mediaIp); storager.updateMediaInfo(mediaServerConfig); redisCatchStorage.updateMediaInfo(mediaServerConfig); // TODO Auto-generated method stub JSONObject ret = new JSONObject(); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import okhttp3.*; import org.slf4j.Logger; @@ -29,6 +30,9 @@ @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Value("${media.ip}") private String mediaIp; @@ -69,7 +73,7 @@ logger.info("zlm接入成功..."); if (autoConfig) saveZLMConfig(); mediaServerConfig = getMediaServerConfig(); storager.updateMediaInfo(mediaServerConfig); redisCatchStorage.updateMediaInfo(mediaServerConfig); } } src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
New file @@ -0,0 +1,58 @@ package com.genersoft.iot.vmp.storager; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; import java.util.Map; public interface IRedisCatchStorage { /** * 开始播放时将流存入 * * @param stream 流信息 * @return */ boolean startPlay(StreamInfo stream); /** * 停止播放时删除 * * @return */ boolean stopPlay(StreamInfo streamInfo); /** * 查询播放列表 * @return */ StreamInfo queryPlay(StreamInfo streamInfo); StreamInfo queryPlayByStreamId(String steamId); StreamInfo queryPlaybackByStreamId(String steamId); StreamInfo queryPlayByDevice(String deviceId, String code); /** * 更新流媒体信息 * @param mediaServerConfig * @return */ boolean updateMediaInfo(MediaServerConfig mediaServerConfig); /** * 获取流媒体信息 * @return */ MediaServerConfig getMediaInfo(); Map<String, StreamInfo> queryPlayByDeviceId(String deviceId); boolean startPlayback(StreamInfo stream); boolean stopPlayback(StreamInfo streamInfo); StreamInfo queryPlaybackByDevice(String deviceId, String code); } src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -17,19 +17,6 @@ */ public interface IVideoManagerStorager { /** * 更新流媒体信息 * @param mediaServerConfig * @return */ public boolean updateMediaInfo(MediaServerConfig mediaServerConfig); /** * 获取流媒体信息 * @return */ public MediaServerConfig getMediaInfo(); /** * 根据设备ID判断设备是否存在 * @@ -106,10 +93,9 @@ /** * 获取多个设备 * * @param deviceIds 设备ID数组 * @return List<Device> 设备对象数组 */ public List<Device> queryVideoDeviceList(String[] deviceIds); public List<Device> queryVideoDeviceList(); /** * 删除设备 @@ -135,27 +121,6 @@ */ public boolean outline(String deviceId); /** * 开始播放时将流存入 * * @param stream 流信息 * @return */ public boolean startPlay(StreamInfo stream); /** * 停止播放时删除 * * @return */ public boolean stopPlay(StreamInfo streamInfo); /** * 查找视频流 * * @return */ public StreamInfo queryPlay(StreamInfo streamInfo); /** * 查询子设备 @@ -168,10 +133,6 @@ */ PageResult querySubChannels(String deviceId, String channelId, String query, Boolean hasSubChannel, String online, int page, int count); /** * 更新缓存 */ public void updateCatch(); /** * 清空通道 @@ -179,17 +140,4 @@ */ void cleanChannelsForDevice(String deviceId); StreamInfo queryPlayByStreamId(String streamId); StreamInfo queryPlayByDevice(String deviceId, String code); Map<String, StreamInfo> queryPlayByDeviceId(String deviceId); boolean startPlayback(StreamInfo streamInfo); boolean stopPlayback(StreamInfo streamInfo); StreamInfo queryPlaybackByDevice(String deviceId, String channelId); StreamInfo queryPlaybackByStreamId(String streamId); } src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerFactory.java
File was deleted src/main/java/com/genersoft/iot/vmp/storager/VodeoMannagerTask.java
@@ -8,10 +8,10 @@ public class VodeoMannagerTask implements CommandLineRunner { @Autowired private IVideoManagerStorager storager; private IVideoManagerStorager redisStorager; @Override public void run(String... strings) throws Exception { storager.updateCatch(); redisStorager.updateCatch(); } } src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
New file @@ -0,0 +1,20 @@ package com.genersoft.iot.vmp.storager.dao; import com.genersoft.iot.vmp.common.PageResult; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import org.apache.ibatis.annotations.Mapper; import java.util.List; @Mapper public interface DeviceChannelMapper { int update(DeviceChannel channel); List<DeviceChannel> queryChannelsByDeviceId(String deviceId); List<DeviceChannel> queryChannelsByDeviceId(String deviceId, String parentChannelId); DeviceChannel queryChannel(String deviceId, String channelId); int cleanChannelsByDeviceId(String deviceId); } src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java
New file @@ -0,0 +1,24 @@ package com.genersoft.iot.vmp.storager.dao; import com.genersoft.iot.vmp.gb28181.bean.Device; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Select; import java.util.List; @Mapper public interface DeviceMapper { @Select("SELECT * FROM device WHERE deviceId = #{deviceId}") Device getDeviceByDeviceId(String deviceId); @Insert("SELECT * FROM device WHERE deviceId = #{deviceId}") int add(Device device); int update(Device device); List<Device> getDevices(); int del(String deviceId); } src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
New file @@ -0,0 +1,172 @@ package com.genersoft.iot.vmp.storager.impl; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @Component public class RedisCatchStorageImpl implements IRedisCatchStorage { @Autowired private RedisUtil redis; @Autowired private DeviceMapper deviceMapper; @Autowired private DeviceChannelMapper deviceChannelMapper; /** * 开始播放时将流存入redis * * @return */ @Override public boolean startPlay(StreamInfo stream) { return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, stream.getStreamId(),stream.getDeviceID(), stream.getCahnnelId()), stream); } /** * 停止播放时从redis删除 * * @return */ @Override public boolean stopPlay(StreamInfo streamInfo) { if (streamInfo == null) return false; DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(streamInfo.getDeviceID(), streamInfo.getCahnnelId()); if (deviceChannel != null) { deviceChannel.setStreamId(null); deviceChannel.setPlay(false); deviceChannel.setDeviceId(streamInfo.getDeviceID()); deviceChannelMapper.update(deviceChannel); } return redis.del(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, streamInfo.getStreamId(), streamInfo.getDeviceID(), streamInfo.getCahnnelId())); } /** * 查询播放列表 * @return */ @Override public StreamInfo queryPlay(StreamInfo streamInfo) { return (StreamInfo)redis.get(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, streamInfo.getStreamId(), streamInfo.getDeviceID(), streamInfo.getCahnnelId())); } @Override public StreamInfo queryPlayByStreamId(String steamId) { List<Object> playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAYER_PREFIX, steamId)); if (playLeys == null || playLeys.size() == 0) return null; return (StreamInfo)redis.get(playLeys.get(0).toString()); } @Override public StreamInfo queryPlaybackByStreamId(String steamId) { List<Object> playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAY_BLACK_PREFIX, steamId)); if (playLeys == null || playLeys.size() == 0) return null; return (StreamInfo)redis.get(playLeys.get(0).toString()); } @Override public StreamInfo queryPlayByDevice(String deviceId, String code) { // List<Object> playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, List<Object> playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, deviceId, code)); if (playLeys == null || playLeys.size() == 0) return null; return (StreamInfo)redis.get(playLeys.get(0).toString()); } /** * 更新流媒体信息 * @param mediaServerConfig * @return */ @Override public boolean updateMediaInfo(MediaServerConfig mediaServerConfig) { return redis.set(VideoManagerConstants.MEDIA_SERVER_PREFIX,mediaServerConfig); } /** * 获取流媒体信息 * @return */ @Override public MediaServerConfig getMediaInfo() { return (MediaServerConfig)redis.get(VideoManagerConstants.MEDIA_SERVER_PREFIX); } @Override public Map<String, StreamInfo> queryPlayByDeviceId(String deviceId) { Map<String, StreamInfo> streamInfos = new HashMap<>(); // List<Object> playLeys = redis.keys(String.format("%S_*_%S_*", VideoManagerConstants.PLAYER_PREFIX, deviceId)); List<Object> players = redis.scan(String.format("%S_*_%S_*", VideoManagerConstants.PLAYER_PREFIX, deviceId)); if (players.size() == 0) return streamInfos; for (int i = 0; i < players.size(); i++) { String key = (String) players.get(i); StreamInfo streamInfo = (StreamInfo)redis.get(key); streamInfos.put(streamInfo.getDeviceID() + "_" + streamInfo.getCahnnelId(), streamInfo); } return streamInfos; } @Override public boolean startPlayback(StreamInfo stream) { return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, stream.getStreamId(),stream.getDeviceID(), stream.getCahnnelId()), stream); } @Override public boolean stopPlayback(StreamInfo streamInfo) { if (streamInfo == null) return false; DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(streamInfo.getDeviceID(), streamInfo.getCahnnelId()); if (deviceChannel != null) { deviceChannel.setStreamId(null); deviceChannel.setPlay(false); deviceChannel.setDeviceId(streamInfo.getDeviceID()); deviceChannelMapper.update(deviceChannel); } return redis.del(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, streamInfo.getStreamId(), streamInfo.getDeviceID(), streamInfo.getCahnnelId())); } @Override public StreamInfo queryPlaybackByDevice(String deviceId, String code) { String format = String.format("%S_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, deviceId, code); List<Object> playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, deviceId, code)); if (playLeys == null || playLeys.size() == 0) { playLeys = redis.scan(String.format("%S_*_*_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, deviceId)); } if (playLeys == null || playLeys.size() == 0) return null; return (StreamInfo)redis.get(playLeys.get(0).toString()); } } src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
New file @@ -0,0 +1,401 @@ package com.genersoft.iot.vmp.storager.impl; import java.util.*; import com.genersoft.iot.vmp.common.PageResult; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.springframework.util.StringUtils; /** * @Description:视频设备数据存储-jdbc实现 * @author: swwheihei * @date: 2020年5月6日 下午2:31:42 */ @Component public class VideoManagerStoragerImpl implements IVideoManagerStorager { @Autowired private DeviceMapper deviceMapper; @Autowired private DeviceChannelMapper deviceChannelMapper; /** * 根据设备ID判断设备是否存在 * * @param deviceId 设备ID * @return true:存在 false:不存在 */ @Override public boolean exists(String deviceId) { return deviceMapper.getDeviceByDeviceId(deviceId) != null; } /** * 视频设备创建 * * @param device 设备对象 * @return true:创建成功 false:创建失败 */ @Override public boolean create(Device device) { return deviceMapper.add(device) > 0; } /** * 视频设备更新 * * @param device 设备对象 * @return true:更新成功 false:更新失败 */ @Override public boolean updateDevice(Device device) { // if (deviceMap.get(device.getDeviceId()) == null) { // deviceMap.put(device.getDeviceId(), new HashMap<String, HashSet<String>>()); // } // 更新device中的通道数量 // device.setChannelCount(deviceMap.get(device.getDeviceId()).size()); int result = deviceMapper.update(device); // 存储device return result > 0; } @Override public void updateChannel(String deviceId, DeviceChannel channel) { String channelId = channel.getChannelId(); channel.setDeviceId(deviceId); deviceChannelMapper.update(channel); // HashMap<String, HashSet<String>> channelMap = deviceMap.get(deviceId); // if (channelMap == null) return; // // 作为父设备, 确定自己的子节点数 // if (channelMap.get(channelId) == null) { // channelMap.put(channelId, new HashSet<String>()); // }else if (channelMap.get(channelId).size() > 0) { // channel.setSubCount(channelMap.get(channelId).size()); // } // // // 存储通道 // redis.set(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + // "_" + channel.getChannelId() + // "_" + (channel.getStatus() == 1 ? "on":"off") + // "_" + (channelMap.get(channelId).size() > 0)+ // "_" + (StringUtils.isEmpty(channel.getParentId())?null:channel.getParentId()), // channel); // // 更新device中的通道数量 // Device device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceId); // device.setChannelCount(deviceMap.get(deviceId).size()); // redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); // // // // 如果有父设备,更新父设备内子节点数 // String parentId = channel.getParentId(); // if (!StringUtils.isEmpty(parentId) && !parentId.equals(deviceId)) { // // if (channelMap.get(parentId) == null) { // channelMap.put(parentId, new HashSet<String>()); // } // channelMap.get(parentId).add(channelId); // // DeviceChannel deviceChannel = queryChannel(deviceId, parentId); // if (deviceChannel != null) { // deviceChannel.setSubCount(channelMap.get(parentId).size()); // redis.set(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + // "_" + deviceChannel.getChannelId() + // "_" + (deviceChannel.getStatus() == 1 ? "on":"off") + // "_" + (channelMap.get(deviceChannel.getChannelId()).size() > 0)+ // "_" + (StringUtils.isEmpty(deviceChannel.getParentId())?null:deviceChannel.getParentId()), // deviceChannel); // // } // } } /** * 获取设备 * * @param deviceId 设备ID * @return Device 设备对象 */ @Override public Device queryVideoDevice(String deviceId) { return deviceMapper.getDeviceByDeviceId(deviceId); } @Override public PageResult queryChannelsByDeviceId(String deviceId, String query, Boolean hasSubChannel, String online, int page, int count) { // 获取到所有正在播放的流 List<DeviceChannel> result = new ArrayList<>(); PageResult pageResult = new PageResult<DeviceChannel>(); deviceChannelMapper.queryChannelsByDeviceId(deviceId); // String queryContent = "*"; // if (!StringUtils.isEmpty(query)) queryContent = String.format("*%S*",query); // String queryHasSubChannel = "*"; // if (hasSubChannel != null) queryHasSubChannel = hasSubChannel?"true":"false"; // String queryOnline = "*"; // if (!StringUtils.isEmpty(online)) queryOnline = online; // String queryStr = VideoManagerConstants.CACHEKEY_PREFIX + deviceId + // "_" + queryContent + // 搜索编号和名称 // "_" + queryOnline + // 搜索是否在线 // "_" + queryHasSubChannel + // 搜索是否含有子节点 // "_" + "*"; // List<Object> deviceChannelList = redis.scan(queryStr); // //对查询结果排序,避免出现通道排列顺序乱序的情况 // Collections.sort(deviceChannelList,new Comparator<Object>(){ // @Override // public int compare(Object o1, Object o2) { // return o1.toString().compareToIgnoreCase(o2.toString()); // } // }); // pageResult.setPage(page); // pageResult.setCount(count); // pageResult.setTotal(deviceChannelList.size()); // int maxCount = (page + 1 ) * count; // if (deviceChannelList != null && deviceChannelList.size() > 0 ) { // for (int i = page * count; i < (pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() ); i++) { // DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); // StreamInfo streamInfo = stringStreamInfoMap.get(deviceId + "_" + deviceChannel.getChannelId()); // deviceChannel.setPlay(streamInfo != null); // if (streamInfo != null) deviceChannel.setStreamId(streamInfo.getStreamId()); // result.add(deviceChannel); // } // pageResult.setData(result); // } return pageResult; } @Override public List<DeviceChannel> queryChannelsByDeviceId(String deviceId) { // List<DeviceChannel> result = new ArrayList<>(); //// List<Object> deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); // List<Object> deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); // // if (deviceChannelList != null && deviceChannelList.size() > 0 ) { // for (int i = 0; i < deviceChannelList.size(); i++) { // result.add((DeviceChannel)redis.get((String) deviceChannelList.get(i))); // } // } return deviceChannelMapper.queryChannelsByDeviceId(deviceId); } @Override public PageResult querySubChannels(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, String online, int page, int count) { deviceChannelMapper.queryChannelsByDeviceId(deviceId, parentChannelId); // List<DeviceChannel> allDeviceChannels = new ArrayList<>(); // String queryContent = "*"; // if (!StringUtils.isEmpty(query)) queryContent = String.format("*%S*",query); // String queryHasSubChannel = "*"; // if (hasSubChannel != null) queryHasSubChannel = hasSubChannel?"true":"false"; // String queryOnline = "*"; // if (!StringUtils.isEmpty(online)) queryOnline = online; // String queryStr = VideoManagerConstants.CACHEKEY_PREFIX + deviceId + // "_" + queryContent + // 搜索编号和名称 // "_" + queryOnline + // 搜索是否在线 // "_" + queryHasSubChannel + // 搜索是否含有子节点 // "_" + parentChannelId; // //// List<Object> deviceChannelList = redis.keys(queryStr); // List<Object> deviceChannelList = redis.scan(queryStr); // // if (deviceChannelList != null && deviceChannelList.size() > 0 ) { // for (int i = 0; i < deviceChannelList.size(); i++) { // DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); // if (deviceChannel.getParentId() != null && deviceChannel.getParentId().equals(parentChannelId)) { // allDeviceChannels.add(deviceChannel); // } // } // } // int maxCount = (page + 1 ) * count; PageResult pageResult = new PageResult<DeviceChannel>(); // pageResult.setPage(page); // pageResult.setCount(count); // pageResult.setTotal(allDeviceChannels.size()); // // if (allDeviceChannels.size() > 0) { // pageResult.setData(allDeviceChannels.subList( // page * count, pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() // )); // } return pageResult; } public List<DeviceChannel> querySubChannels(String deviceId, String parentChannelId) { List<DeviceChannel> allDeviceChannels = new ArrayList<>(); // List<Object> deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); // List<Object> deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); // // if (deviceChannelList != null && deviceChannelList.size() > 0 ) { // for (int i = 0; i < deviceChannelList.size(); i++) { // DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); // if (deviceChannel.getParentId() != null && deviceChannel.getParentId().equals(parentChannelId)) { // allDeviceChannels.add(deviceChannel); // } // } // } return allDeviceChannels; } @Override public DeviceChannel queryChannel(String deviceId, String channelId) { DeviceChannel deviceChannel = null; return deviceChannelMapper.queryChannel(deviceId, channelId); //// List<Object> deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + // List<Object> deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + // "_" + channelId + "*"); // if (deviceChannelList != null && deviceChannelList.size() > 0 ) { // deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(0)); // } // return deviceChannel; } /** * 获取多个设备 * * @param deviceIds 设备ID数组 * @return List<Device> 设备对象数组 */ @Override public PageResult<Device> queryVideoDeviceList(String[] deviceIds, int page, int count) { List<Device> devices = new ArrayList<>(); PageResult pageResult = new PageResult<Device>(); // pageResult.setPage(page); // pageResult.setCount(count); // Device device = null; // // if (deviceIds == null || deviceIds.length == 0) { // //// List<Object> deviceIdList = redis.keys(VideoManagerConstants.DEVICE_PREFIX+"*"); // List<Object> deviceIdList = redis.scan(VideoManagerConstants.DEVICE_PREFIX+"*"); // pageResult.setTotal(deviceIdList.size()); // int maxCount = (page + 1)* count; // for (int i = page * count; i < (pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() ); i++) { // // devices.add((Device)redis.get((String)deviceIdList.get(i))); // device =(Device)redis.get((String)deviceIdList.get(i)); // if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ // // outline(device.getDeviceId()); // } // devices.add(device); // } // } else { // for (int i = 0; i < deviceIds.length; i++) { // // devices.add((Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i])); // device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i]); // if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ // // outline(device.getDeviceId()); // } // devices.add(device); // } // } // pageResult.setData(devices); return pageResult; } /** * 获取多个设备 * * @return List<Device> 设备对象数组 */ @Override public List<Device> queryVideoDeviceList() { // if (deviceIds == null || deviceIds.length == 0) { //// List<Object> deviceIdList = redis.keys(VideoManagerConstants.DEVICE_PREFIX+"*"); // List<Object> deviceIdList = redis.scan(VideoManagerConstants.DEVICE_PREFIX+"*"); // for (int i = 0; i < deviceIdList.size(); i++) { // device =(Device)redis.get((String)deviceIdList.get(i)); // if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ // outline(device.getDeviceId()); // } // devices.add(device); // } // } else { // for (int i = 0; i < deviceIds.length; i++) { // device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i]); // if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ // outline(device.getDeviceId()); // } // devices.add(device); // } // } List<Device> deviceList = deviceMapper.getDevices(); return deviceList; } /** * 删除设备 * * @param deviceId 设备ID * @return true:删除成功 false:删除失败 */ @Override public boolean delete(String deviceId) { int result = deviceMapper.del(deviceId); return result > 0; } /** * 更新设备在线 * * @param deviceId 设备ID * @return true:更新成功 false:更新失败 */ @Override public boolean online(String deviceId) { Device device = deviceMapper.getDeviceByDeviceId(deviceId); device.setOnline(1); return deviceMapper.update(device) > 0; } /** * 更新设备离线 * * @param deviceId 设备ID * @return true:更新成功 false:更新失败 */ @Override public boolean outline(String deviceId) { // Device device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceId); // if (device == null) return false; // device.setOnline(0); // return redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); Device device = deviceMapper.getDeviceByDeviceId(deviceId); device.setOnline(0); return deviceMapper.update(device) > 0; } @Override public void cleanChannelsForDevice(String deviceId) { int result = deviceChannelMapper.cleanChannelsByDeviceId(deviceId); } } src/main/java/com/genersoft/iot/vmp/storager/jdbc/VideoManagerJdbcStoragerImpl.java
File was deleted src/main/java/com/genersoft/iot/vmp/storager/redis/VideoManagerRedisStoragerImpl.java
File was deleted src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java
@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.service.IPlayService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,9 @@ private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @Autowired @@ -60,7 +64,7 @@ Device device = storager.queryVideoDevice(deviceId); StreamInfo streamInfo = storager.queryPlayByDevice(deviceId, channelId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); UUID uuid = UUID.randomUUID(); DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(); @@ -89,7 +93,7 @@ msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { storager.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo); cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); @@ -117,25 +121,59 @@ } @PostMapping("/play/{streamId}/stop") public ResponseEntity<String> playStop(@PathVariable String streamId) { public DeferredResult<ResponseEntity<String>> playStop(@PathVariable String streamId) { cmder.streamByeCmd(streamId); StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); if (streamInfo == null) return new ResponseEntity<String>("streamId not found", HttpStatus.OK); storager.stopPlay(streamInfo); if (logger.isDebugEnabled()) { logger.debug(String.format("设备预览停止API调用,streamId:%s", streamId)); } logger.debug(String.format("设备预览/回放停止API调用,streamId:%s", streamId)); UUID uuid = UUID.randomUUID(); DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(); // 录像查询以channelId作为deviceId查询 resultHolder.put(DeferredResultHolder.CALLBACK_CMD_STOP + uuid, result); cmder.streamByeCmd(streamId, event -> { StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfo == null) { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setData("streamId not found"); resultHolder.invokeResult(msg); redisCatchStorage.stopPlay(streamInfo); } RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_STOP + uuid); Response response = event.getResponse(); msg.setData(String.format("success")); resultHolder.invokeResult(msg); }); if (streamId != null) { JSONObject json = new JSONObject(); json.put("streamId", streamId); return new ResponseEntity<String>(json.toString(), HttpStatus.OK); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setData(json.toString()); resultHolder.invokeResult(msg); } else { logger.warn("设备预览停止API调用失败!"); return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR); logger.warn("设备预览/回放停止API调用失败!"); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setData("streamId null"); resultHolder.invokeResult(msg); } // 超时处理 result.onTimeout(()->{ logger.warn(String.format("设备预览/回放停止超时,streamId:%s ", streamId)); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_STOP + uuid); msg.setData("Timeout"); resultHolder.invokeResult(msg); }); return result; } /** @@ -145,7 +183,7 @@ */ @PostMapping("/play/{streamId}/convert") public ResponseEntity<String> playConvert(@PathVariable String streamId) { StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfo == null) { logger.warn("视频转码API调用失败!, 视频流已经停止!"); return new ResponseEntity<String>("未找到视频流信息, 视频流可能已经停止", HttpStatus.OK); @@ -155,7 +193,7 @@ logger.warn("视频转码API调用失败!, 视频流已停止推流!"); return new ResponseEntity<String>("推流信息在流媒体中不存在, 视频流可能已停止推流", HttpStatus.OK); } else { MediaServerConfig mediaInfo = storager.getMediaInfo(); MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); String dstUrl = String.format("rtmp://%s:%s/convert/%s", "127.0.0.1", mediaInfo.getRtmpPort(), streamId ); String srcUrl = String.format("rtsp://%s:%s/rtp/%s", "127.0.0.1", mediaInfo.getRtspPort(), streamId); src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java
@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.play.PlayController; import com.genersoft.iot.vmp.vmanager.service.IPlayService; @@ -25,6 +26,9 @@ private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private DeferredResultHolder resultHolder; @Override @@ -33,7 +37,7 @@ msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); StreamInfo streamInfo = onPublishHandler(resonse, deviceId, channelId, uuid); if (streamInfo != null) { storager.startPlay(streamInfo); redisCatchStorage.startPlay(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { @@ -49,7 +53,7 @@ msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); StreamInfo streamInfo = onPublishHandler(resonse, deviceId, channelId, uuid); if (streamInfo != null) { storager.startPlayback(streamInfo); redisCatchStorage.startPlayback(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { @@ -65,7 +69,7 @@ streamInfo.setStreamId(streamId); streamInfo.setDeviceID(deviceId); streamInfo.setCahnnelId(channelId); MediaServerConfig mediaServerConfig = storager.getMediaInfo(); MediaServerConfig mediaServerConfig = redisCatchStorage.getMediaInfo(); streamInfo.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); streamInfo.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java
@@ -65,7 +65,7 @@ JSONObject result = new JSONObject(); List<Device> devices; if (start == null || limit ==null) { devices = storager.queryVideoDeviceList(null); devices = storager.queryVideoDeviceList(); result.put("DeviceCount", devices.size()); }else { PageResult<Device> deviceList = storager.queryVideoDeviceList(null, start/limit, limit); src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java
@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.play.PlayController; import org.slf4j.Logger; @@ -34,6 +35,9 @@ @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; private boolean closeWaitRTPInfo = false; @@ -158,14 +162,14 @@ ){ StreamInfo streamInfo = storager.queryPlayByDevice(serial, code); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code); if (streamInfo == null) { JSONObject result = new JSONObject(); result.put("error","未找到流信息"); return result; } cmder.streamByeCmd(streamInfo.getStreamId()); storager.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo); return null; } src/main/resources/wvp.sqliteBinary files differ
web_src/src/components/channelList.vue
@@ -104,7 +104,7 @@ mounted() { this.initData(); this.updateLooper = setInterval(this.initData, 10000); this.updateLooper = setInterval(this.initData, 1000); }, destroyed() { this.$destroy('videojs');