src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceOffLineDetector.java
New file @@ -0,0 +1,24 @@ package com.genersoft.iot.vmp.gb28181.event; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.utils.redis.RedisUtil; /** * @Description:设备离在线状态检测器,用于检测设备状态 * @author: songww * @date: 2020年5月13日 下午2:40:29 */ @Component public class DeviceOffLineDetector { @Autowired private RedisUtil redis; public boolean isOnline(String deviceId) { String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + deviceId; return redis.hasKey(key); } } src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
@@ -4,8 +4,8 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent; import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent; import com.genersoft.iot.vmp.gb28181.event.outline.OutlineEvent; /** * @Description:Event事件通知推送器,支持推送在线事件、离线事件 @@ -26,7 +26,7 @@ } public void outlineEventPublish(String deviceId, String from){ OutlineEvent outEvent = new OutlineEvent(this); OfflineEvent outEvent = new OfflineEvent(this); outEvent.setDeviceId(deviceId); outEvent.setFrom(from); applicationEventPublisher.publishEvent(outEvent); src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepliveTimeoutListener.java
File was renamed from src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/KeepliveTimeoutListener.java @@ -1,4 +1,4 @@ package com.genersoft.iot.vmp.gb28181.event.outline; package com.genersoft.iot.vmp.gb28181.event.offline; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEvent.java
File was renamed from src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/OutlineEvent.java @@ -1,4 +1,4 @@ package com.genersoft.iot.vmp.gb28181.event.outline; package com.genersoft.iot.vmp.gb28181.event.offline; import org.springframework.context.ApplicationEvent; @@ -7,7 +7,7 @@ * @author: songww * @date: 2020年5月6日 上午11:33:13 */ public class OutlineEvent extends ApplicationEvent { public class OfflineEvent extends ApplicationEvent { /** * @Title: OutlineEvent @@ -15,7 +15,7 @@ * @param: @param source * @throws */ public OutlineEvent(Object source) { public OfflineEvent(Object source) { super(source); } src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java
File was renamed from src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/OutlineEventListener.java @@ -1,4 +1,4 @@ package com.genersoft.iot.vmp.gb28181.event.outline; package com.genersoft.iot.vmp.gb28181.event.offline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,14 +13,14 @@ /** * @Description: 离线事件监听器,监听到离线后,修改设备离在线状态。 设备离线有两个来源: * 1、设备主动注销,发送注销指令,{@link com.genersoft.iot.vmp.gb28181.transmit.request.impl.RegisterRequestProcessor} * 2、设备未知原因离线,心跳超时,{@link com.genersoft.iot.vmp.gb28181.event.outline.OutlineEventListener} * 2、设备未知原因离线,心跳超时,{@link com.genersoft.iot.vmp.gb28181.event.offline.OfflineEventListener} * @author: songww * @date: 2020年5月6日 下午1:51:23 */ @Component public class OutlineEventListener implements ApplicationListener<OutlineEvent> { public class OfflineEventListener implements ApplicationListener<OfflineEvent> { private final static Logger logger = LoggerFactory.getLogger(OutlineEventListener.class); private final static Logger logger = LoggerFactory.getLogger(OfflineEventListener.class); @Autowired private IVideoManagerStorager storager; @@ -29,7 +29,7 @@ private RedisUtil redis; @Override public void onApplicationEvent(OutlineEvent event) { public void onApplicationEvent(OfflineEvent event) { if (logger.isDebugEnabled()) { logger.debug("设备离线事件触发,deviceId:" + event.getDeviceId() + ",from:" + event.getFrom()); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -79,7 +79,7 @@ * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ public String playbackStreamCmd(Device device,String channelId, String recordId, String startTime, String endTime); public String playbackStreamCmd(Device device,String channelId, String startTime, String endTime); /** * 语音广播 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1,14 +1,13 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import java.text.ParseException; import java.util.Random; import javax.sip.ClientTransaction; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import javax.sip.message.Request; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; @@ -18,8 +17,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.SsrcUtil; import tk.mybatis.mapper.util.StringUtil; /** * @Description:设备能力接口,用于定义设备的控制、查询能力 @@ -181,16 +178,16 @@ * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ @Override public String playbackStreamCmd(Device device, String channelId, String recordId, String startTime, String endTime) { public String playbackStreamCmd(Device device, String channelId, String startTime, String endTime) { try { String ssrc = SsrcUtil.getPlayBackSsrc(); // StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+channelId+" 0 0 IN IP4 "+sipConfig.getSipIp()+"\r\n"); content.append("o="+device.getDeviceId()+" 0 0 IN IP4 "+sipConfig.getSipIp()+"\r\n"); content.append("s=Playback\r\n"); content.append("u="+recordId+":3\r\n"); content.append("u="+channelId+":3\r\n"); content.append("c=IN IP4 "+sipConfig.getMediaIp()+"\r\n"); content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" "+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n"); if(device.getTransport().equals("TCP")) { @@ -439,11 +436,15 @@ } private void transmitRequest(Device device, Request request) throws SipException { ClientTransaction clientTransaction = null; if(device.getTransport().equals("TCP")) { sipLayer.getTcpSipProvider().sendRequest(request); clientTransaction = sipLayer.getTcpSipProvider().getNewClientTransaction(request); //sipLayer.getTcpSipProvider().sendRequest(request); } else if(device.getTransport().equals("UDP")) { sipLayer.getUdpSipProvider().sendRequest(request); clientTransaction = sipLayer.getUdpSipProvider().getNewClientTransaction(request); //sipLayer.getUdpSipProvider().sendRequest(request); } clientTransaction.sendRequest(); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -30,6 +30,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -69,7 +70,20 @@ @Autowired private DeferredResultHolder deferredResultHolder; @Autowired private DeviceOffLineDetector offLineDetector; private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; private static final String MESSAGE_CATALOG = "Catalog"; private static final String MESSAGE_DEVICE_INFO = "DeviceInfo"; private static final String MESSAGE_KEEP_ALIVE = "Keepalive"; private static final String MESSAGE_ALARM = "Alarm"; private static final String MESSAGE_RECORD_INFO = "RecordInfo"; // private static final String MESSAGE_BROADCAST = "Broadcast"; // private static final String MESSAGE_DEVICE_STATUS = "DeviceStatus"; // private static final String MESSAGE_MOBILE_POSITION = "MobilePosition"; // private static final String MESSAGE_MOBILE_POSITION_INTERVAL = "Interval"; /** * 处理MESSAGE请求 @@ -85,22 +99,31 @@ this.transaction = transaction; Request request = evt.getRequest(); SAXReader reader = new SAXReader(); Document xml; try { xml = reader.read(new ByteArrayInputStream(request.getRawContent())); Element rootElement = xml.getRootElement(); String cmd = rootElement.element("CmdType").getStringValue(); if (new String(request.getRawContent()).contains("<CmdType>Keepalive</CmdType>")) { if (MESSAGE_KEEP_ALIVE.equals(cmd)) { logger.info("接收到KeepAlive消息"); processMessageKeepAlive(evt); } else if (new String(request.getRawContent()).contains("<CmdType>Catalog</CmdType>")) { } else if (MESSAGE_CATALOG.equals(cmd)) { logger.info("接收到Catalog消息"); processMessageCatalogList(evt); } else if (new String(request.getRawContent()).contains("<CmdType>DeviceInfo</CmdType>")) { } else if (MESSAGE_DEVICE_INFO.equals(cmd)) { logger.info("接收到DeviceInfo消息"); processMessageDeviceInfo(evt); } else if (new String(request.getRawContent()).contains("<CmdType>Alarm</CmdType>")) { } else if (MESSAGE_ALARM.equals(cmd)) { logger.info("接收到Alarm消息"); processMessageAlarm(evt); } else if (new String(request.getRawContent()).contains("<CmdType>RecordInfo</CmdType>")) { } else if (MESSAGE_RECORD_INFO.equals(cmd)) { logger.info("接收到RecordInfo消息"); processMessageRecordInfo(evt); } } catch (DocumentException e) { e.printStackTrace(); } } @@ -247,12 +270,17 @@ */ private void processMessageKeepAlive(RequestEvent evt){ try { Request request = evt.getRequest(); Response response = layer.getMessageFactory().createResponse(Response.OK,request); Element rootElement = getRootElement(evt); Element deviceIdElement = rootElement.element("DeviceID"); String deviceId = XmlUtil.getText(rootElement,"DeviceID"); Request request = evt.getRequest(); Response response = null; if (offLineDetector.isOnline(deviceId)) { response = layer.getMessageFactory().createResponse(Response.OK,request); publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); } else { response = layer.getMessageFactory().createResponse(Response.BAD_REQUEST,request); } transaction.sendResponse(response); publisher.onlineEventPublish(deviceIdElement.getText(), VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/InviteResponseProcessor.java
@@ -1,15 +1,25 @@ package com.genersoft.iot.vmp.gb28181.transmit.response.impl; import java.text.ParseException; import javax.sip.ClientTransaction; import javax.sip.Dialog; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CSeqHeader; import javax.sip.header.ViaHeader; import javax.sip.message.Request; import javax.sip.message.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorFactory; import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor; /** @@ -20,20 +30,51 @@ @Component public class InviteResponseProcessor implements ISIPResponseProcessor { private final static Logger logger = LoggerFactory.getLogger(SIPProcessorFactory.class); /** * 处理invite响应 * * @param request * @param evt * 响应消息 */ @Override public void process(ResponseEvent evt, SipLayer layer, SipConfig config) { try { Dialog dialog = evt.getDialog(); Request reqAck =dialog.createAck(1L); dialog.sendAck(reqAck); Response response = evt.getResponse(); int statusCode = response.getStatusCode(); //trying不会回复 if(statusCode == Response.TRYING){ } //成功响应 //下发ack if(statusCode == Response.OK){ ClientTransaction clientTransaction = evt.getClientTransaction(); if(clientTransaction == null){ logger.error("回复ACK时,clientTransaction为null >>> {}",response); return; } Dialog clientDialog = clientTransaction.getDialog(); CSeqHeader clientCSeqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME); long cseqId = clientCSeqHeader.getSeqNumber(); /* createAck函数,创建的ackRequest,会采用Invite响应的200OK,中的contact字段中的地址,作为目标地址。 有的终端传上来的可能还是内网地址,会造成ack发送不出去。接受不到音视频流 所以在此处统一替换地址。和响应消息的Via头中的地址保持一致。 */ Request ackRequest = clientDialog.createAck(cseqId); SipURI requestURI = (SipURI) ackRequest.getRequestURI(); ViaHeader viaHeader = (ViaHeader) response.getHeader(ViaHeader.NAME); requestURI.setHost(viaHeader.getHost()); requestURI.setPort(viaHeader.getPort()); clientDialog.sendAck(ackRequest); } } catch (InvalidArgumentException | SipException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } src/main/java/com/genersoft/iot/vmp/vmanager/playback/PlaybackController.java
@@ -10,6 +10,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -30,7 +31,7 @@ public ResponseEntity<String> play(@PathVariable String deviceId,@PathVariable String channelId, String startTime, String endTime){ Device device = storager.queryVideoDevice(deviceId); String ssrc = cmder.playStreamCmd(device, channelId); String ssrc = cmder.playbackStreamCmd(device, channelId, startTime, endTime); if (logger.isDebugEnabled()) { logger.debug(String.format("设备预览 API调用,deviceId:%s ,channelId:%s",deviceId, channelId)); @@ -38,7 +39,9 @@ } if(ssrc!=null) { return new ResponseEntity<String>(ssrc,HttpStatus.OK); JSONObject json = new JSONObject(); json.put("ssrc", ssrc); return new ResponseEntity<String>(json.toString(),HttpStatus.OK); } else { logger.warn("设备预览API调用失败!"); return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR); src/main/resources/application.yml
@@ -26,7 +26,7 @@ server: port: 8080 sip: ip: 10.200.64.63 ip: 127.0.0.1 port: 5060 # 根据国标6.1.2中规定,domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码(由省级、市级、区级、基层编号组成,参照GB/T 2260-2007) # 后两位为行业编码,定义参照附录D.3