src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java
New file @@ -0,0 +1,14 @@ package com.genersoft.iot.vmp.gb28181.bean; import javax.sdp.SessionDescription; public class SDPInfo { private byte[] source; private SessionDescription sdpSource; private String sessionName; private Long startTime; private Long stopTime; private String username; private String address; private String ssrc; } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -453,6 +453,7 @@ subscribeKey.put("app", "rtp"); subscribeKey.put("stream", ssrcInfo.getStream()); subscribeKey.put("regist", true); subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, @@ -718,6 +719,7 @@ if (ssrcTransaction != null) { MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream()); streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); } } catch (SipException | ParseException e) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -68,7 +68,7 @@ */ @Override public void process(RequestEvent evt) { logger.debug("ACK请求: {}", ((System.currentTimeMillis()))); logger.info("ACK请求: {}", ((System.currentTimeMillis()))); Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { @@ -88,10 +88,6 @@ streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStream(sendRtpItem.getStreamId()); }else { streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); sendRtpItem.setStreamId(streamInfo.getStream()); streamInfo.setApp("rtp"); } redisCatchStorage.updateSendRTPSever(sendRtpItem); logger.info(platformGbId); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -90,29 +91,31 @@ int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount == 0) { logger.info(streamId + "无其它观看者,通知设备停止推流"); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); }else if (totalReaderCount == -1){ logger.warn(streamId + " 查找其它观看者失败"); } } // 可能是设备主动停止 Device device = storager.queryVideoDeviceByChannelId(platformGbId); if (device != null) { if (sendRtpItem.isPlay()) { StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); if (streamInfo != null) { redisCatchStorage.stopPlay(streamInfo); if (device != null) { StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); if (sendRtpItem != null) { if (sendRtpItem.isPlay()) { if (streamInfo != null) { redisCatchStorage.stopPlay(streamInfo); } }else { if (streamInfo != null) { redisCatchStorage.stopPlayback(streamInfo); } } }else { StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), channelId); if (streamInfo != null) { redisCatchStorage.stopPlayback(streamInfo); } } storager.stopPlay(device.getDeviceId(), channelId); mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream()); storager.stopPlay(device.getDeviceId(), channelId); mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream()); } } } } catch (SipException e) { e.printStackTrace(); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -101,19 +102,12 @@ @Override public void process(RequestEvent evt) { // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 Long startTimeForInvite = System.currentTimeMillis(); try { Request request = evt.getRequest(); SipURI sipURI = (SipURI) request.getRequestURI(); String channelId = sipURI.getUser(); String requesterId = null; FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); requesterId = uri.getUser(); if (requesterId == null || channelId == null) { logger.info("无法从FromHeader的Address中获取到平台id,返回400"); responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误 @@ -122,7 +116,9 @@ // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform != null) { if (platform == null) { inviteFromDeviceHandle(evt, requesterId); }else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); @@ -141,7 +137,7 @@ mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE, "media server not found"); responseAck(evt, Response.GONE); return; } Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); @@ -197,7 +193,6 @@ // 查看是否支持PS 负载96 //String ip = null; int port = -1; //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (Object description : mediaDescriptions) { @@ -233,7 +228,6 @@ } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); //String sessionName = sdp.getSessionName().getValue(); logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc); Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 @@ -271,8 +265,10 @@ Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ logger.info("[上级点播]收到下级开始点播订阅, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); // if (sendRtpItem == null) return; logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); // * 0 等待设备推流上来 // * 1 下级已经推流,等待上级平台回复ack // * 2 推流中 sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -301,9 +297,6 @@ } catch (ParseException e) { e.printStackTrace(); } if ("Playback".equals(sessionName) && responseJSON != null) { playService.onPublishHandlerForPlayBack(finalMediaServerItem, responseJSON, finalDevice.getDeviceId(), channelId, null); } }; SipSubscribe.Event errorEvent = ((event) -> { // 未知错误。直接转发设备点播的错误 @@ -319,10 +312,29 @@ }); if ("Playback".equals(sessionName)) { sendRtpItem.setPlay(false); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, sendRtpItem.getSsrc(), true); sendRtpItem.setStreamId(ssrc); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); commander.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, format.format(start), format.format(end), hookEvent, errorEvent); playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> { if (result.getCode() != 0){ logger.warn("录像回放失败"); if (result.getEvent() != null) { errorEvent.response(result.getEvent()); } try { responseAck(evt, Response.REQUEST_TIMEOUT); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } }else { if (result.getMediaServerItem() != null) { hookEvent.response(result.getMediaServerItem(), result.getResponse()); } } }); }else { sendRtpItem.setPlay(true); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); @@ -333,7 +345,7 @@ sendRtpItem.setPlay(false); playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); }else { sendRtpItem.setStreamId(streamInfo.getStreamId()); sendRtpItem.setStreamId(streamInfo.getStream()); hookEvent.response(mediaServerItem, null); } } @@ -379,72 +391,6 @@ } } } else { // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); responseAck(evt, Response.TRYING); String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 String substring = contentString; String ssrc = "0000000404"; int ssrcIndex = contentString.indexOf("y="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); } ssrcIndex = substring.indexOf("f="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); } SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 int port = -1; //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (int i = 0; i < mediaDescriptions.size(); i++) { MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); if (mediaFormats.contains("8")) { port = media.getMediaPort(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp if ("TCP/RTP/AVP".equals(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; } else if ("passive".equals(setup)) { tcpActive = false; } } } break; } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); } else { logger.warn("来自无效设备/平台的请求"); responseAck(evt, Response.BAD_REQUEST); } } } catch (SipException | InvalidArgumentException | ParseException e) { @@ -457,4 +403,74 @@ e.printStackTrace(); } } public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); Request request = evt.getRequest(); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); responseAck(evt, Response.TRYING); String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 String substring = contentString; String ssrc = "0000000404"; int ssrcIndex = contentString.indexOf("y="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); } ssrcIndex = substring.indexOf("f="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); } SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 int port = -1; //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (int i = 0; i < mediaDescriptions.size(); i++) { MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); if (mediaFormats.contains("8")) { port = media.getMediaPort(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp if ("TCP/RTP/AVP".equals(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; } else if ("passive".equals(setup)) { tcpActive = false; } } } break; } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); } else { logger.warn("来自无效设备/平台的请求"); responseAck(evt, Response.BAD_REQUEST); } } } src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -48,7 +48,7 @@ SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean isPlayback); void closeRTPServer(Device device, String channelId, String ssrc); void closeRTPServer(String deviceId, String channelId, String ssrc); void clearRTPServer(MediaServerItem mediaServerItem); src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java
@@ -1,9 +1,10 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface PlayBackCallback { void call(RequestMessage msg); void call(PlayBackResult<RequestMessage> msg); } src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java
New file @@ -0,0 +1,55 @@ package com.genersoft.iot.vmp.service.bean; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import javax.sip.RequestEvent; public class PlayBackResult<T> { private int code; private T data; private MediaServerItem mediaServerItem; private JSONObject response; private SipSubscribe.EventResult event; public int getCode() { return code; } public void setCode(int code) { this.code = code; } public T getData() { return data; } public void setData(T data) { this.data = data; } public MediaServerItem getMediaServerItem() { return mediaServerItem; } public void setMediaServerItem(MediaServerItem mediaServerItem) { this.mediaServerItem = mediaServerItem; } public JSONObject getResponse() { return response; } public void setResponse(JSONObject response) { this.response = response; } public SipSubscribe.EventResult getEvent() { return event; } public void setEvent(SipSubscribe.EventResult event) { this.event = event; } } src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -160,16 +160,16 @@ } @Override public void closeRTPServer(Device device, String channelId, String stream) { String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId, stream); String ssrc = streamSession.getSSRC(device.getDeviceId(), channelId, stream); public void closeRTPServer(String deviceId, String channelId, String stream) { String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream); String ssrc = streamSession.getSSRC(deviceId, channelId, stream); MediaServerItem mediaServerItem = this.getOne(mediaServerId); if (mediaServerItem != null) { String streamId = String.format("%s_%s", device.getDeviceId(), channelId); String streamId = String.format("%s_%s", deviceId, channelId); zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); releaseSsrc(mediaServerItem, ssrc); } streamSession.remove(device.getDeviceId(), channelId, stream); streamSession.remove(deviceId, channelId, stream); } @Override src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -115,11 +116,8 @@ msg.setData(wvpResult); // 点播超时回复BYE cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream()); // 释放rtpserver mediaServerService.closeRTPServer(playResult.getDevice(), channelId, streamInfo.getStream()); // 回复之前所有的点播请求 resultHolder.invokeAllResult(msg); // TODO 释放ssrc }); result.onCompletion(()->{ // 点播结束时调用截图接口 @@ -173,7 +171,10 @@ WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); // 点播返回sip错误 mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream()); mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); @@ -222,7 +223,10 @@ logger.info("收到订阅消息: " + response.toJSONString()); onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); }, (event) -> { mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream()); mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); @@ -240,7 +244,7 @@ RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -298,9 +302,12 @@ RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(key); PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>(); result.onTimeout(()->{ msg.setData("回放超时"); callback.call(msg); playBackResult.setCode(-1); playBackResult.setData(msg); callback.call(playBackResult); }); cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); @@ -308,15 +315,24 @@ if (streamInfo == null) { logger.warn("设备回放API调用失败!"); msg.setData("设备回放API调用失败!"); callback.call(msg); playBackResult.setCode(-1); playBackResult.setData(msg); callback.call(playBackResult); return; } redisCatchStorage.startPlayback(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); callback.call(msg); playBackResult.setCode(0); playBackResult.setData(msg); playBackResult.setMediaServerItem(mediaServerItem); playBackResult.setResponse(response); callback.call(playBackResult); }, event -> { msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); callback.call(msg); playBackResult.setCode(-1); playBackResult.setData(msg); playBackResult.setEvent(event); callback.call(playBackResult); }); return result; } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -129,7 +129,6 @@ //Response response = event.getResponse(); msg.setData(String.format("success")); resultHolder.invokeAllResult(msg); mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream()); }); if (deviceId != null || channelId != null) { src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java
@@ -77,8 +77,8 @@ logger.debug(String.format("设备回放 API调用,deviceId:%s ,channelId:%s", deviceId, channelId)); } DeferredResult<ResponseEntity<String>> result = playService.playBack(deviceId, channelId, startTime, endTime, msg->{ resultHolder.invokeResult(msg); DeferredResult<ResponseEntity<String>> result = playService.playBack(deviceId, channelId, startTime, endTime, wvpResult->{ resultHolder.invokeResult(wvpResult.getData()); }); return result;