src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java
@@ -131,7 +131,7 @@ /** * 是否开启ssrc校验,默认关闭,开启可以防止串流 */ private boolean ssrcCheck; private boolean ssrcCheck = true; /** * 地理坐标系, 目前支持 WGS84,GCJ02 TODO CGCS2000 src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
@@ -88,8 +88,8 @@ this.type = "timeout"; this.msg = "消息超时未回复"; this.statusCode = -1024; this.callId = timeoutEvent.getClientTransaction().getDialog().getCallId().getCallId(); this.dialog = timeoutEvent.getClientTransaction().getDialog(); this.callId = this.dialog != null?timeoutEvent.getClientTransaction().getDialog().getCallId().getCallId(): null; }else if (event instanceof TransactionTerminatedEvent) { TransactionTerminatedEvent transactionTerminatedEvent = (TransactionTerminatedEvent)event; this.type = "transactionTerminated"; @@ -109,8 +109,8 @@ this.type = "deviceNotFoundEvent"; this.msg = "设备未找到"; this.statusCode = -1024; this.callId = deviceNotFoundEvent.getDialog().getCallId().getCallId(); this.dialog = deviceNotFoundEvent.getDialog(); this.callId = this.dialog != null ?deviceNotFoundEvent.getDialog().getCallId().getCallId() : null; } } } @@ -130,6 +130,9 @@ } public void removeErrorSubscribe(String key) { if(key == null){ return; } errorSubscribes.remove(key); errorTimeSubscribes.remove(key); } @@ -139,6 +142,9 @@ } public void removeOkSubscribe(String key) { if(key == null){ return; } okSubscribes.remove(key); okTimeSubscribes.remove(key); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -143,6 +143,14 @@ * 回放倍速播放 */ void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed); /** * 回放控制 * @param device * @param streamInfo * @param content */ void playbackControlCmd(Device device, StreamInfo streamInfo, String content,SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent); /** * 语音广播 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1828,6 +1828,43 @@ e.printStackTrace(); } } @Override public void playbackControlCmd(Device device, StreamInfo streamInfo, String content,SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) { try { Request request = headerProvider.createInfoRequest(device, streamInfo, content); if (request == null) { return; } logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { clientTransaction = tcpSipProvider.getNewClientTransaction(request); } else if ("UDP".equals(device.getTransport())) { clientTransaction = udpSipProvider.getNewClientTransaction(request); } CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); if(errorEvent != null) { sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> { errorEvent.response(eventResult); sipSubscribe.removeErrorSubscribe(eventResult.callId); sipSubscribe.removeOkSubscribe(eventResult.callId); })); } if(okEvent != null) { sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult -> { okEvent.response(eventResult); sipSubscribe.removeOkSubscribe(eventResult.callId); sipSubscribe.removeErrorSubscribe(eventResult.callId); }); } clientTransaction.sendRequest(); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); } } @Override public boolean sendAlarmMessage(Device device, DeviceAlarm deviceAlarm) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java
New file @@ -0,0 +1,143 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.info; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.header.*; import javax.sip.message.Response; import java.text.ParseException; @Component public class InfoRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { private final static Logger logger = LoggerFactory.getLogger(InfoRequestProcessor.class); private final String method = "INFO"; @Autowired private SIPProcessorObserver sipProcessorObserver; @Autowired private IVideoManagerStorage storage; @Autowired private SipSubscribe sipSubscribe; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private IVideoManagerStorage storager; @Autowired private SIPCommander cmder; @Autowired private VideoStreamSessionManager sessionManager; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } @Override public void process(RequestEvent evt) { logger.debug("接收到消息:" + evt.getRequest()); String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); // 先从会话内查找 SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, callIdHeader.getCallId(), null); if (ssrcTransaction != null) { // 兼容海康 媒体通知 消息from字段不是设备ID的问题 deviceId = ssrcTransaction.getDeviceId(); } // 查询设备是否存在 Device device = redisCatchStorage.getDevice(deviceId); // 查询上级平台是否存在 ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId); try { if (device != null && parentPlatform != null) { logger.warn("[重复]平台与设备编号重复:{}", deviceId); SIPRequest request = (SIPRequest) evt.getRequest(); String hostAddress = request.getRemoteAddress().getHostAddress(); int remotePort = request.getRemotePort(); if (device.getHostAddress().equals(hostAddress + ":" + remotePort)) { parentPlatform = null; }else { device = null; } } if (device == null && parentPlatform == null) { // 不存在则回复404 responseAck(evt, Response.NOT_FOUND, "device "+ deviceId +" not found"); logger.warn("[设备未找到 ]: {}", deviceId); if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){ SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new DeviceNotFoundEvent(evt.getDialog())); sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).response(eventResult); }; }else { ContentTypeHeader header = (ContentTypeHeader)evt.getRequest().getHeader(ContentTypeHeader.NAME); String contentType = header.getContentType(); String contentSubType = header.getContentSubType(); if ("Application".equals(contentType) && "MANSRTSP".equals(contentSubType)) { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); String streamId = sendRtpItem.getStreamId(); StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null); if (null == streamInfo) { responseAck(evt, Response.NOT_FOUND, "stream " + streamId + " not found"); return; } Device device1 = storager.queryVideoDevice(streamInfo.getDeviceID()); cmder.playbackControlCmd(device1,streamInfo,new String(evt.getRequest().getRawContent()),eventResult -> { // 失败的回复 try { responseAck(evt, eventResult.statusCode, eventResult.msg); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } }, eventResult -> { // 成功的回复 try { responseAck(evt, eventResult.statusCode); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } }); } } } catch (SipException e) { logger.warn("SIP 回复错误", e); } catch (InvalidArgumentException e) { logger.warn("参数无效", e); } catch (ParseException e) { logger.warn("SIP回复时解析异常", e); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java
@@ -82,7 +82,7 @@ deviceService.offline(device.getDeviceId()); } RequestMessage msg = new RequestMessage(); msg.setKey(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + device.getDeviceId() + channelId); msg.setKey(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + device.getDeviceId()); msg.setData(json); deferredResultHolder.invokeAllResult(msg); } src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
@@ -255,6 +255,8 @@ }else if (deviceChannel.getChannelId().length() == 20) { if (Integer.parseInt(deviceChannel.getChannelId().substring(10, 13)) == 216) { // 虚拟组织 deviceChannel.setParentId(businessGroupID); }else if (Integer.parseInt(device.getDeviceId().substring(10, 13) )== 118) {//NVR 如果上级设备编号是NVR则直接将NVR的编号设置给通道的上级编号 deviceChannel.setParentId(device.getDeviceId()); }else if (deviceChannel.getCivilCode() != null) { // 设备, 无parentId的20位是使用CivilCode表示上级的设备, // 注:215 业务分组是需要有parentId的 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -342,6 +342,11 @@ Device device = storager.queryVideoDevice(deviceId); String uuid = UUID.randomUUID().toString(); String key = DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + deviceId; DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(2*1000L); if(device == null) { result.setResult(new ResponseEntity(String.format("设备%s不存在", deviceId),HttpStatus.OK)); return result; } cmder.deviceStatusQuery(device, event -> { RequestMessage msg = new RequestMessage(); msg.setId(uuid); @@ -349,7 +354,6 @@ msg.setData(String.format("获取设备状态失败,错误码: %s, %s", event.statusCode, event.msg)); resultHolder.invokeResult(msg); }); DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(2*1000L); result.onTimeout(()->{ logger.warn(String.format("获取设备状态超时")); // 释放rtpserver