src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -8,8 +8,10 @@ import java.util.concurrent.TimeUnit; import javax.sip.*; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -33,6 +35,9 @@ @Autowired private SIPProcessorFactory processorFactory; @Autowired private SipSubscribe sipSubscribe; private SipStack sipStack; @@ -139,11 +144,19 @@ // 增加其它无需回复的响应,如101、180等 } else { logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); if (evt.getResponse() != null && sipSubscribe.getSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getSubscribe(callIdHeader.getCallId()); if (subscribe != null) { subscribe.response(evt); } } } } // trying不会回复 // if (status == Response.TRYING) { // } } /** src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java
@@ -21,6 +21,6 @@ // TODO 后续处理,只有第一次注册时调用查询设备信息,如需更新调用更新API接口 cmder.deviceInfoQuery(device); cmder.catalogQuery(device); cmder.catalogQuery(device, null); } } src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
New file @@ -0,0 +1,37 @@ package com.genersoft.iot.vmp.gb28181.event; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.sip.ResponseEvent; import javax.sip.message.Request; import java.util.EventObject; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Component public class SipSubscribe { private final static Logger logger = LoggerFactory.getLogger(SipSubscribe.class); private Map<String, SipSubscribe.Event> allSubscribes = new ConcurrentHashMap<>(); public interface Event { void response(ResponseEvent event); } public void addSubscribe(String key, SipSubscribe.Event event) { allSubscribes.put(key, event); } public SipSubscribe.Event getSubscribe(String key) { return allSubscribes.get(key); } public int getSize(){ return allSubscribes.size(); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -4,10 +4,13 @@ import javax.sip.ResponseEvent; import javax.sip.SipProvider; import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.header.Header; import javax.sip.message.Request; import javax.sip.message.Response; import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -83,7 +86,8 @@ @Autowired private OtherResponseProcessor otherResponseProcessor; // 注:这里使用注解会导致循环依赖注入,暂用springBean private SipProvider tcpSipProvider; @@ -94,6 +98,7 @@ Request request = evt.getRequest(); String method = request.getMethod(); // logger.info("接收到消息:"+request.getMethod()); // sipSubscribe.getSubscribe(evt.getServerTransaction().getBranchId()).response(evt); if (Request.INVITE.equals(method)) { InviteRequestProcessor processor = new InviteRequestProcessor(); processor.setRequestEvent(evt); @@ -145,6 +150,7 @@ } public ISIPResponseProcessor createResponseProcessor(ResponseEvent evt) { Response response = evt.getResponse(); CSeqHeader cseqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME); String method = cseqHeader.getMethod(); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; /** @@ -83,7 +84,7 @@ * @param device 视频设备 * @param channelId 预览通道 */ void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event); void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); /** * 请求回放视频流 @@ -93,7 +94,7 @@ * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event); void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); /** * 视频流停止 @@ -175,7 +176,7 @@ * * @param device 视频设备 */ boolean catalogQuery(Device device); boolean catalogQuery(Device device, SipSubscribe.Event errorEvent); /** * 查询录像信息 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -4,22 +4,22 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.sip.ClientTransaction; import javax.sip.Dialog; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import javax.sip.SipProvider; import javax.sip.TransactionDoesNotExistException; import javax.sip.*; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.Header; import javax.sip.header.ViaHeader; import javax.sip.message.Request; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -39,6 +39,8 @@ */ @Component public class SIPCommander implements ISIPCommander { private final Logger logger = LoggerFactory.getLogger(SIPCommander.class); @Autowired private SipConfig sipConfig; @@ -68,6 +70,9 @@ @Autowired private ZLMHttpHookSubscribe subscribe; @Autowired private SipSubscribe sipSubscribe; @@ -221,7 +226,7 @@ Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag"); transmitRequest(device, request); transmitRequest(device, request, null); return true; } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -256,22 +261,23 @@ ptzXml.append("</Control>\r\n"); Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag"); transmitRequest(device, request); transmitRequest(device, request, null); return true; } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); } return false; } /** * 请求预览视频流 * * 请求预览视频流 * @param device 视频设备 * @param channelId 预览通道 * @param event hook订阅 * @param errorEvent sip错误订阅 */ @Override public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event) { public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) { try { String ssrc = streamSession.createPlaySsrc(); @@ -300,7 +306,8 @@ // StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+channelId+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n"); // content.append("o="+channelId+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n"); content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n"); content.append("t=0 0\r\n"); @@ -332,7 +339,7 @@ Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "live", null, ssrc); ClientTransaction transaction = transmitRequest(device, request); ClientTransaction transaction = transmitRequest(device, request, errorEvent); streamSession.put(streamId, transaction); DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); if (deviceChannel != null) { @@ -357,7 +364,8 @@ * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ @Override public void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event) { public void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event , SipSubscribe.Event errorEvent) { try { MediaServerConfig mediaInfo = storager.getMediaInfo(); String ssrc = streamSession.createPlayBackSsrc(); @@ -413,8 +421,8 @@ content.append("y="+ssrc+"\r\n");//ssrc Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "playback", null); ClientTransaction transaction = transmitRequest(device, request); ClientTransaction transaction = transmitRequest(device, request, errorEvent); streamSession.put(streamId, transaction); } catch ( SipException | ParseException | InvalidArgumentException e) { @@ -575,7 +583,8 @@ catalogXml.append("</Query>\r\n"); Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaDeviceInfoBranch", "FromDeviceInfoTag", "ToDeviceInfoTag"); transmitRequest(device, request); transmitRequest(device, request, null); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -590,7 +599,7 @@ * @param device 视频设备 */ @Override public boolean catalogQuery(Device device) { public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) { // 清空通道 storager.cleanChannelsForDevice(device.getDeviceId()); try { @@ -602,8 +611,9 @@ catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n"); catalogXml.append("</Query>\r\n"); Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaCatalogBranch", "FromCatalogTag", "ToCatalogTag"); transmitRequest(device, request); Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaCatalogBranch", "FromCatalogTag", null); transmitRequest(device, request, errorEvent); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); return false; @@ -636,7 +646,9 @@ recordInfoXml.append("</Query>\r\n"); Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "ViaRecordInfoBranch", "FromRecordInfoTag", "ToRecordInfoTag"); transmitRequest(device, request); transmitRequest(device, request, null); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); return false; @@ -688,13 +700,20 @@ return false; } private ClientTransaction transmitRequest(Device device, Request request) throws SipException { private ClientTransaction transmitRequest(Device device, Request request, SipSubscribe.Event errorEvent) throws SipException { ClientTransaction clientTransaction = null; if("TCP".equals(device.getTransport())) { clientTransaction = tcpSipProvider.getNewClientTransaction(request); } else if("UDP".equals(device.getTransport())) { clientTransaction = udpSipProvider.getNewClientTransaction(request); } // 添加订阅 if (errorEvent != null) { CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); sipSubscribe.addSubscribe(callIdHeader.getCallId(), errorEvent); } clientTransaction.sendRequest(); return clientTransaction; } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -294,7 +294,7 @@ device.setStreamMode("UDP"); } storager.updateDevice(device); cmder.catalogQuery(device); cmder.catalogQuery(device, null); // 回复200 OK responseAck(evt); if (offLineDetector.isOnline(deviceId)) { src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -323,7 +323,7 @@ cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); }); }, null); } } src/main/java/com/genersoft/iot/vmp/utils/SpringBeanFactory.java
@@ -34,6 +34,7 @@ * 获取对象 这里重写了bean方法,起主要作用 */ public static Object getBean(String beanId) throws BeansException { if (applicationContext == null) return null; return applicationContext.getBean(beanId); } src/main/java/com/genersoft/iot/vmp/vmanager/device/DeviceController.java
@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.common.PageResult; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -18,6 +19,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import javax.sip.message.Response; @CrossOrigin @RestController @@ -86,11 +89,25 @@ if (logger.isDebugEnabled()) { } logger.debug("设备信息同步API调用,deviceId:" + deviceId); logger.debug("设备通道信息同步API调用,deviceId:" + deviceId); Device device = storager.queryVideoDevice(deviceId); cmder.catalogQuery(device); DeferredResult<ResponseEntity<Device>> result = new DeferredResult<ResponseEntity<Device>>(); cmder.catalogQuery(device, event -> { Response response = event.getResponse(); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_CATALOG+deviceId); msg.setData(String.format("同步通道失败,错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); }); DeferredResult<ResponseEntity<Device>> result = new DeferredResult<ResponseEntity<Device>>(2*1000L); result.onTimeout(()->{ logger.warn(String.format("设备通道信息同步超时")); // 释放rtpserver RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_CATALOG+deviceId); msg.setData("Timeout"); resultHolder.invokeResult(msg); }); resultHolder.put(DeferredResultHolder.CALLBACK_CMD_CATALOG+deviceId, result); return result; } src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java
@@ -28,6 +28,7 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.message.Response; import java.text.DecimalFormat; import java.util.UUID; @@ -72,6 +73,12 @@ cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); }, event -> { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); Response response = event.getResponse(); msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); }); } else { String streamId = streamInfo.getStreamId(); @@ -86,6 +93,12 @@ cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); }, event -> { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); Response response = event.getResponse(); msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); }); } } src/main/java/com/genersoft/iot/vmp/vmanager/playback/PlaybackController.java
@@ -27,6 +27,7 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.message.Response; import java.util.UUID; @CrossOrigin @@ -78,6 +79,12 @@ cmder.playbackStreamCmd(device, channelId, startTime, endTime, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); playService.onPublishHandlerForPlayBack(response, deviceId, channelId, uuid.toString()); }, event -> { Response response = event.getResponse(); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setData(String.format("回放失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); }); return result; web_src/src/components/videoList.vue
@@ -8,7 +8,7 @@ <div style="background-color: #FFFFFF; margin-bottom: 1rem; position: relative; padding: 0.5rem; text-align: left;"> <span style="font-size: 1rem; font-weight: bold;">设备列表</span> <div style="position: absolute; right: 1rem; top: 0.3rem;"> <el-button icon="el-icon-refresh-right" circle size="mini" @click="getDeviceList()"></el-button> <el-button icon="el-icon-refresh-right" circle size="mini" :loading="getDeviceListLoading" @click="getDeviceList()"></el-button> </div> </div> <devicePlayer ref="devicePlayer"></devicePlayer> @@ -51,7 +51,7 @@ <el-table-column label="操作" width="240" align="center" fixed="right"> <template slot-scope="scope"> <el-button size="mini" icon="el-icon-refresh" @click="refDevice(scope.row)">刷新通道</el-button> <el-button size="mini" :ref="scope.row.deviceId + 'refbtn' " icon="el-icon-refresh" @click="refDevice(scope.row)">刷新通道</el-button> <el-button size="mini" icon="el-icon-s-open" type="primary" @click="showChannelList(scope.row)">查看通道</el-button> </template> </el-table-column> @@ -90,7 +90,8 @@ winHeight: window.innerHeight - 200, currentPage:1, count:15, total:0 total:0, getDeviceListLoading: false }; }, computed: { @@ -130,7 +131,7 @@ }, getDeviceList: function() { let that = this; this.getDeviceListLoading = true; this.$axios.get(`/api/devices`,{ params: { page: that.currentPage - 1, @@ -141,9 +142,11 @@ console.log(res); that.total = res.data.total; that.deviceList = res.data.data; that.getDeviceListLoading = false; }) .catch(function (error) { console.log(error); that.getDeviceListLoading = false; }); }, @@ -158,17 +161,30 @@ refDevice: function(itemData) { ///api/devices/{deviceId}/sync console.log("刷新对应设备:" + itemData.deviceId); var that = this; that.$refs[itemData.deviceId + 'refbtn' ].loading = true; this.$axios({ method: 'post', url: '/api/devices/' + itemData.deviceId + '/sync' }).then(function(res) { // console.log("刷新设备结果:"+JSON.stringify(res)); console.log("刷新设备结果:"+JSON.stringify(res)); if (!res.data.deviceId) { that.$message({ showClose: true, message: res.data, type: 'error' }); }else{ that.$message({ showClose: true, message: '请求成功', type: 'success' }); } that.$refs[itemData.deviceId + 'refbtn' ].loading = false; }).catch(function(e) { that.$message({ showClose: true, message: '请求成功', type: 'success' }); console.error(e) that.$refs[itemData.deviceId + 'refbtn' ].loading = false; });; }, //通知设备上传媒体流