src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*; import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.vmanager.service.IPlayService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -103,6 +104,9 @@ @Autowired private OtherResponseProcessor otherResponseProcessor; @Autowired private IPlayService playService; // 注:这里使用注解会导致循环依赖注入,暂用springBean private SipProvider tcpSipProvider; @@ -120,7 +124,9 @@ processor.setTcpSipProvider(getTcpSipProvider()); processor.setUdpSipProvider(getUdpSipProvider()); processor.setCmder(cmder); processor.setCmderFroPlatform(cmderFroPlatform); processor.setPlayService(playService); processor.setStorager(storager); return processor; } else if (Request.REGISTER.equals(method)) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java
@@ -10,19 +10,26 @@ import javax.sip.message.Request; import javax.sip.message.Response; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.sdp.Codec; import com.genersoft.iot.vmp.gb28181.sdp.MediaDescription; import com.genersoft.iot.vmp.gb28181.sdp.SdpParser; import com.genersoft.iot.vmp.gb28181.sdp.SessionDescription; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult; import com.genersoft.iot.vmp.vmanager.service.IPlayService; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; import java.text.ParseException; @@ -40,6 +47,10 @@ private SIPCommanderFroPlatform cmderFroPlatform; private IVideoManagerStorager storager; private SIPCommander cmder; private IPlayService playService; /** * 处理invite请求 @@ -119,7 +130,30 @@ String ssrc = sdp.getSsrc(); Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId); if (device == null) { logger.warn("点播平台{}的通道{}时未找到设备信息", platformId, channel); response500Ack(evt); return; } // 通知下级推流, PlayResult playResult = playService.play(device.getDeviceId(), channelId, (response)->{ // 收到推流, 回复200OK },(event -> { // 未知错误。直接转发设备点播的错误 Response response = null; try { response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest()); getServerTransaction(evt).sendResponse(response); } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } })); playResult.getResult(); // 查找合适的端口推流, // 发送 200ok // 收到ack后调用推流接口 @@ -149,14 +183,16 @@ } /*** * 回复404 * 回复200 OK * @param evt * @throws SipException * @throws InvalidArgumentException * @throws ParseException */ private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest()); private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); response.setContent(sdp, contentTypeHeader); getServerTransaction(evt).sendResponse(response); } @@ -173,6 +209,18 @@ } /*** * 回复404 * @param evt * @throws SipException * @throws InvalidArgumentException * @throws ParseException */ private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest()); getServerTransaction(evt).sendResponse(response); } /*** * 回复488 * @param evt * @throws SipException @@ -185,18 +233,18 @@ } /*** * 回复200 OK * 回复500 * @param evt * @throws SipException * @throws InvalidArgumentException * @throws ParseException */ private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); response.setContent(sdp, contentTypeHeader); private void response500Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.SERVER_INTERNAL_ERROR, evt.getRequest()); getServerTransaction(evt).sendResponse(response); } @@ -222,4 +270,20 @@ public void setStorager(IVideoManagerStorager storager) { this.storager = storager; } public SIPCommander getCmder() { return cmder; } public void setCmder(SIPCommander cmder) { this.cmder = cmder; } public IPlayService getPlayService() { return playService; } public void setPlayService(IPlayService playService) { this.playService = playService; } } src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -234,4 +234,6 @@ DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); } src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -335,4 +335,9 @@ DeviceChannel channel = patformChannelMapper.queryChannelInParentPlatform(platformId, channelId); return channel; } @Override public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) { return null; } } src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java
@@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult; import com.genersoft.iot.vmp.vmanager.service.IPlayService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,62 +65,19 @@ @PathVariable String channelId) { Device device = storager.queryVideoDevice(deviceId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); UUID uuid = UUID.randomUUID(); DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(); // 录像查询以channelId作为deviceId查询 resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result); if (streamInfo == null) { // 发送点播消息 cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); }, event -> { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); Response response = event.getResponse(); msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); }); } else { String streamId = streamInfo.getStreamId(); JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId); if (rtpInfo.getBoolean("exist")) { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); }, event -> { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); Response response = event.getResponse(); msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); }); } } PlayResult playResult = playService.play(deviceId, channelId, null, null); // 超时处理 result.onTimeout(()->{ playResult.getResult().onTimeout(()->{ logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); // 释放rtpserver cmder.closeRTPServer(device, channelId); cmder.closeRTPServer(playResult.getDevice(), channelId); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid()); msg.setData("Timeout"); resultHolder.invokeResult(msg); }); return result; return playResult.getResult(); } @PostMapping("/play/{streamId}/stop") src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java
New file @@ -0,0 +1,37 @@ package com.genersoft.iot.vmp.vmanager.play.bean; import com.genersoft.iot.vmp.gb28181.bean.Device; import org.springframework.http.ResponseEntity; import org.springframework.web.context.request.async.DeferredResult; public class PlayResult { private DeferredResult<ResponseEntity<String>> result; private String uuid; private Device device; public DeferredResult<ResponseEntity<String>> getResult() { return result; } public void setResult(DeferredResult<ResponseEntity<String>> result) { this.result = result; } public String getUuid() { return uuid; } public void setUuid(String uuid) { this.uuid = uuid; } public Device getDevice() { return device; } public void setDevice(Device device) { this.device = device; } } src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java
@@ -2,6 +2,11 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult; import org.springframework.http.ResponseEntity; import org.springframework.web.context.request.async.DeferredResult; /** * 点播处理 @@ -10,4 +15,6 @@ void onPublishHandlerForPlayBack(JSONObject resonse, String deviceId, String channelId, String uuid); void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid); PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); } src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java
@@ -4,19 +4,29 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.play.PlayController; import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult; import com.genersoft.iot.vmp.vmanager.service.IPlayService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.message.Response; import java.text.DecimalFormat; import java.util.UUID; @Service public class PlayServiceImpl implements IPlayService { @@ -27,11 +37,76 @@ private IVideoManagerStorager storager; @Autowired private SIPCommander cmder; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private DeferredResultHolder resultHolder; @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @Override public PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) { PlayResult playResult = new PlayResult(); Device device = storager.queryVideoDevice(deviceId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); UUID uuid = UUID.randomUUID(); playResult.setUuid(uuid.toString()); DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(); playResult.setResult(result); // 录像查询以channelId作为deviceId查询 resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result); if (streamInfo == null) { // 发送点播消息 cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); if (hookEvent != null) { hookEvent.response(response); } }, event -> { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); Response response = event.getResponse(); msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); if (errorEvent != null) { errorEvent.response(event); } }); } else { String streamId = streamInfo.getStreamId(); JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId); if (rtpInfo.getBoolean("exist")) { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); }, event -> { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); Response response = event.getResponse(); msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); }); } } return playResult; } @Override public void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage();