src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
New file @@ -0,0 +1,42 @@ package com.genersoft.iot.vmp.conf; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; /** * 动态定时任务 */ @Component public class DynamicTask { @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; private Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { return new ThreadPoolTaskScheduler(); } public String startCron(String key, Runnable task, String corn) { stopCron(key); ScheduledFuture future = threadPoolTaskScheduler.schedule(task, new CronTrigger(corn)); futureMap.put(key, future); return "startCron"; } public void stopCron(String key) { if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(true); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/ISIPRequestProcessor.java
New file @@ -0,0 +1,14 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request; import javax.sip.RequestEvent; /** * @description: 对SIP事件进行处理,包括request, response, timeout, ioException, transactionTerminated,dialogTerminated * @author: panlinlin * @date: 2021年11月5日 15:47 */ public interface ISIPRequestProcessor { void process(RequestEvent event); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorAbstract.java
New file @@ -0,0 +1,179 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPServerTransaction; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import javax.sip.*; import javax.sip.address.Address; import javax.sip.address.AddressFactory; import javax.sip.address.SipURI; import javax.sip.header.ContentTypeHeader; import javax.sip.header.HeaderFactory; import javax.sip.header.ViaHeader; import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; import java.io.ByteArrayInputStream; import java.text.ParseException; /** * @description:处理接收IPCamera发来的SIP协议请求消息 * @author: songww * @date: 2020年5月3日 下午4:42:22 */ public abstract class SIPRequestProcessorAbstract implements InitializingBean, ISIPRequestProcessor { private final static Logger logger = LoggerFactory.getLogger(SIPRequestProcessorAbstract.class); @Autowired @Qualifier(value="tcpSipProvider") private SipProviderImpl tcpSipProvider; @Autowired @Qualifier(value="udpSipProvider") private SipProviderImpl udpSipProvider; /** * 根据 RequestEvent 获取 ServerTransaction * @param evt * @return */ public ServerTransaction getServerTransaction(RequestEvent evt) { Request request = evt.getRequest(); ServerTransaction serverTransaction = evt.getServerTransaction(); // 判断TCP还是UDP boolean isTcp = false; ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); String transport = reqViaHeader.getTransport(); if (transport.equals("TCP")) { isTcp = true; } if (serverTransaction == null) { try { if (isTcp) { SipStackImpl stack = (SipStackImpl)tcpSipProvider.getSipStack(); serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true); if (serverTransaction == null) { serverTransaction = tcpSipProvider.getNewServerTransaction(request); } } else { SipStackImpl stack = (SipStackImpl)udpSipProvider.getSipStack(); serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true); if (serverTransaction == null) { serverTransaction = udpSipProvider.getNewServerTransaction(request); } } } catch (TransactionAlreadyExistsException e) { logger.error(e.getMessage()); } catch (TransactionUnavailableException e) { logger.error(e.getMessage()); } } return serverTransaction; } public AddressFactory getAddressFactory() { try { return SipFactory.getInstance().createAddressFactory(); } catch (PeerUnavailableException e) { e.printStackTrace(); } return null; } public HeaderFactory getHeaderFactory() { try { return SipFactory.getInstance().createHeaderFactory(); } catch (PeerUnavailableException e) { e.printStackTrace(); } return null; } public MessageFactory getMessageFactory() { try { return SipFactory.getInstance().createMessageFactory(); } catch (PeerUnavailableException e) { e.printStackTrace(); } return null; } /*** * 回复状态码 * 100 trying * 200 OK * 400 * 404 * @param evt * @throws SipException * @throws InvalidArgumentException * @throws ParseException */ public void responseAck(RequestEvent evt, int statusCode) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); ServerTransaction serverTransaction = getServerTransaction(evt); serverTransaction.sendResponse(response); if (statusCode >= 200) { if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); } } public void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); response.setReasonPhrase(msg); ServerTransaction serverTransaction = getServerTransaction(evt); serverTransaction.sendResponse(response); if (statusCode >= 200) { if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); } } /** * 回复带sdp的200 * @param evt * @param sdp * @throws SipException * @throws InvalidArgumentException * @throws ParseException */ public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); SipFactory sipFactory = SipFactory.getInstance(); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); response.setContent(sdp, contentTypeHeader); SipURI sipURI = (SipURI)evt.getRequest().getRequestURI(); Address concatAddress = sipFactory.createAddressFactory().createAddress( sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort() )); response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); getServerTransaction(evt).sendResponse(response); } public Element getRootElement(RequestEvent evt) throws DocumentException { return getRootElement(evt, "gb2312"); } public Element getRootElement(RequestEvent evt, String charset) throws DocumentException { if (charset == null) charset = "gb2312"; Request request = evt.getRequest(); SAXReader reader = new SAXReader(); reader.setEncoding(charset); Document xml = reader.read(new ByteArrayInputStream(request.getRawContent())); return xml.getRootElement(); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
New file @@ -0,0 +1,123 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.RequestEvent; import javax.sip.address.SipURI; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import java.util.HashMap; import java.util.Map; /** * @description:ACK请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:31:45 */ @Component public class AckRequestProcessor extends SIPRequestProcessorAbstract { private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); private String method = "ACK"; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired private IMediaServerService mediaServerService; /** * 处理 ACK请求 * * @param evt */ @Override public void process(RequestEvent evt) { Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = null; if (deviceId == null) { streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStreamId(sendRtpItem.getStreamId()); }else { streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); sendRtpItem.setStreamId(streamInfo.getStreamId()); streamInfo.setApp("rtp"); } redisCatchStorage.updateSendRTPSever(sendRtpItem); logger.info(platformGbId); logger.info(channelId); Map<String, Object> param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",streamInfo.getApp()); param.put("stream",streamInfo.getStreamId()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); //param.put ("src_port", sendRtpItem.getLocalPort()); // 设备推流查询,成功后才能转推 boolean rtpPushed = false; long startTime = System.currentTimeMillis(); while (!rtpPushed) { try { if (System.currentTimeMillis() - startTime < 30 * 1000) { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { rtpPushed = true; logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); } else { logger.info("等待设备推流[{}/{}].......", streamInfo.getApp() ,streamInfo.getStreamId()); Thread.sleep(1000); continue; } } else { rtpPushed = true; logger.info("设备推流[{}/{}]超时,终止向上级推流", streamInfo.getApp() ,streamInfo.getStreamId()); } } catch (InterruptedException e) { e.printStackTrace(); } } } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
New file @@ -0,0 +1,115 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.*; import javax.sip.address.SipURI; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.HashMap; import java.util.Map; /** * @description: BYE请求处理器 * @author: lawrencehj * @date: 2021年3月9日 */ @Component public class ByeRequestProcessor extends SIPRequestProcessorAbstract { private Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class); @Autowired private ISIPCommander cmder; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private IVideoManagerStorager storager; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired private IMediaServerService mediaServerService; private String method = "BYE"; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } /** * 处理BYE请求 * @param evt */ @Override public void process(RequestEvent evt) { try { responseAck(evt, Response.OK); Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState().equals(DialogState.TERMINATED)) { String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); logger.info("收到bye, [{}/{}]", platformGbId, channelId); if (sendRtpItem != null){ String streamId = sendRtpItem.getStreamId(); Map<String, Object> param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); param.put("stream",streamId); param.put("ssrc",sendRtpItem.getSsrc()); logger.info("停止向上级推流:" + streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) { logger.info(streamId + "无其它观看者,通知设备停止推流"); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); } } // 可能是设备主动停止 Device device = storager.queryVideoDeviceByChannelId(platformGbId); if (device != null) { StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); if (streamInfo != null) { redisCatchStorage.stopPlay(streamInfo); } storager.stopPlay(device.getDeviceId(), channelId); mediaServerService.closeRTPServer(device, channelId); } } } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java
New file @@ -0,0 +1,40 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.RequestEvent; /** * @description:CANCEL请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:32:23 */ @Component public class CancelRequestProcessor extends SIPRequestProcessorAbstract { private String method = "CANCEL"; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } /** * 处理CANCEL请求 * * @param evt 事件 */ @Override public void process(RequestEvent evt) { // TODO 优先级99 Cancel Request消息实现,此消息一般为级联消息,上级给下级发送请求取消指令 } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
New file @@ -0,0 +1,386 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sdp.*; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.FromHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.util.Vector; /** * @description:处理INVITE请求 * @author: panll * @date: 2021年1月14日 */ @SuppressWarnings("rawtypes") @Component public class InviteRequestProcessor extends SIPRequestProcessorAbstract { private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); private String method = "INVITE"; @Autowired private SIPCommanderFroPlatform cmderFroPlatform; @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private SIPCommander cmder; @Autowired private IPlayService playService; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired private IMediaServerService mediaServerService; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } /** * 处理invite请求 * * @param evt * 请求消息 */ @Override public void process(RequestEvent evt) { // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 try { Request request = evt.getRequest(); SipURI sipURI = (SipURI) request.getRequestURI(); String channelId = sipURI.getUser(); String requesterId = null; FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); requesterId = uri.getUser(); if (requesterId == null || channelId == null) { logger.info("无法从FromHeader的Address中获取到平台id,返回400"); responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误 return; } // 查询请求方是否上级平台 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform != null) { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); MediaServerItem mediaServerItem = null; // 不是通道可能是直播流 if (channel != null && gbStream == null ) { if (channel.getStatus() == 0) { logger.info("通道离线,返回400"); responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); return; } responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 }else if(channel == null && gbStream != null){ String mediaServerId = gbStream.getMediaServerId(); mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE, "media server not found"); return; } Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (!streamReady ) { logger.info("[ app={}, stream={} ]通道离线,返回400",gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); return; } responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 }else { logger.info("通道不存在,返回404"); responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在 return; } // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 int ssrcIndex = contentString.indexOf("y="); //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 String ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); String substring = contentString.substring(0, contentString.indexOf("y=")); SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 //String ip = null; int port = -1; //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (Object description : mediaDescriptions) { MediaDescription mediaDescription = (MediaDescription) description; Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); if (mediaFormats.contains("96")) { port = media.getMediaPort(); //String mediaType = media.getMediaType(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp if ("TCP/RTP/AVP".equals(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; } else if ("passive".equals(setup)) { tcpActive = false; } } } break; } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); //String sessionName = sdp.getSessionName().getValue(); logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc); Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 if (channel != null) { device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); if (device == null) { logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); responseAck(evt, Response.SERVER_INTERNAL_ERROR); return; } mediaServerItem = playService.getNewMediaServerItem(device); if (mediaServerItem == null) { logger.warn("未找到可用的zlm"); responseAck(evt, Response.BUSY_HERE); return; } SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, mediaTransmissionTCP); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); // 通知下级推流, PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{ // 收到推流, 回复200OK, 等待ack // if (sendRtpItem == null) return; sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); // TODO 添加对tcp的支持 StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y="+ ssrc + "\r\n"); content.append("f=\r\n"); try { responseAck(evt, content.toString()); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } ,((event) -> { // 未知错误。直接转发设备点播的错误 Response response = null; try { response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); ServerTransaction serverTransaction = getServerTransaction(evt); serverTransaction.sendResponse(response); if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } })); if (logger.isDebugEnabled()) { logger.debug(playResult.getResult().toString()); } }else if (gbStream != null) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); // TODO 添加对tcp的支持 StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y="+ ssrc + "\r\n"); content.append("f=\r\n"); try { responseAck(evt, content.toString()); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } } else { // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = storager.queryVideoDevice(requesterId); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); responseAck(evt, Response.TRYING); String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 String substring = contentString; String ssrc = "0000000404"; int ssrcIndex = contentString.indexOf("y="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); } ssrcIndex = substring.indexOf("f="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); } SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 int port = -1; //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (int i = 0; i < mediaDescriptions.size(); i++) { MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); if (mediaFormats.contains("8")) { port = media.getMediaPort(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp if ("TCP/RTP/AVP".equals(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; } else if ("passive".equals(setup)) { tcpActive = false; } } } break; } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); } else { logger.warn("来自无效设备/平台的请求"); responseAck(evt, Response.BAD_REQUEST); } } } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); logger.warn("sdp解析错误"); e.printStackTrace(); } catch (SdpParseException e) { e.printStackTrace(); } catch (SdpException e) { e.printStackTrace(); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor.java
New file @@ -0,0 +1,1103 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.VManageBootstrap; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; import com.genersoft.iot.vmp.gb28181.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceAlarmService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.utils.GpsUtil; import com.genersoft.iot.vmp.utils.SpringBeanFactory; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.address.SipUri; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.sip.*; import javax.sip.address.SipURI; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.UUID; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; /** * @description:MESSAGE请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:32:41 */ @SuppressWarnings(value={"unchecked", "rawtypes"}) @Component public class MessageRequestProcessor extends SIPRequestProcessorAbstract { public static volatile List<String> threadNameList = new ArrayList(); private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class); private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; private static final String MESSAGE_KEEP_ALIVE = "Keepalive"; private static final String MESSAGE_CONFIG_DOWNLOAD = "ConfigDownload"; private static final String MESSAGE_CATALOG = "Catalog"; private static final String MESSAGE_DEVICE_INFO = "DeviceInfo"; private static final String MESSAGE_ALARM = "Alarm"; private static final String MESSAGE_RECORD_INFO = "RecordInfo"; private static final String MESSAGE_MEDIA_STATUS = "MediaStatus"; private static final String MESSAGE_BROADCAST = "Broadcast"; private static final String MESSAGE_DEVICE_STATUS = "DeviceStatus"; private static final String MESSAGE_DEVICE_CONTROL = "DeviceControl"; private static final String MESSAGE_DEVICE_CONFIG = "DeviceConfig"; private static final String MESSAGE_MOBILE_POSITION = "MobilePosition"; private static final String MESSAGE_PRESET_QUERY = "PresetQuery"; private String method = "MESSAGE"; @Autowired private UserSetup userSetup; @Autowired private SIPCommander cmder; @Autowired private SipConfig config; @Autowired private SIPCommanderFroPlatform cmderFroPlatform; @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private EventPublisher publisher; @Autowired private RedisUtil redis; @Autowired private DeferredResultHolder deferredResultHolder; @Autowired private DeviceOffLineDetector offLineDetector; @Autowired private IDeviceAlarmService deviceAlarmService; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } /** * 处理MESSAGE请求 * * @param evt */ @Override public void process(RequestEvent evt) { try { Element rootElement = getRootElement(evt); String cmd = getText(rootElement, "CmdType"); if (MESSAGE_KEEP_ALIVE.equals(cmd)) { logger.debug("接收到KeepAlive消息"); processMessageKeepAlive(evt); } else if (MESSAGE_CONFIG_DOWNLOAD.equals(cmd)) { logger.debug("接收到ConfigDownload消息"); processMessageConfigDownload(evt); } else if (MESSAGE_CATALOG.equals(cmd)) { logger.debug("接收到Catalog消息"); processMessageCatalogList(evt); } else if (MESSAGE_DEVICE_INFO.equals(cmd)) { // DeviceInfo消息处理 processMessageDeviceInfo(evt); } else if (MESSAGE_DEVICE_STATUS.equals(cmd)) { // DeviceStatus消息处理 processMessageDeviceStatus(evt); } else if (MESSAGE_DEVICE_CONTROL.equals(cmd)) { logger.debug("接收到DeviceControl消息"); processMessageDeviceControl(evt); } else if (MESSAGE_DEVICE_CONFIG.equals(cmd)) { logger.info("接收到DeviceConfig消息"); processMessageDeviceConfig(evt); } else if (MESSAGE_ALARM.equals(cmd)) { logger.debug("接收到Alarm消息"); processMessageAlarm(evt); } else if (MESSAGE_RECORD_INFO.equals(cmd)) { logger.debug("接收到RecordInfo消息"); processMessageRecordInfo(evt); }else if (MESSAGE_MEDIA_STATUS.equals(cmd)) { logger.debug("接收到MediaStatus消息"); processMessageMediaStatus(evt); } else if (MESSAGE_MOBILE_POSITION.equals(cmd)) { logger.debug("接收到MobilePosition消息"); processMessageMobilePosition(evt); } else if (MESSAGE_PRESET_QUERY.equals(cmd)) { logger.debug("接收到PresetQuery消息"); processMessagePresetQuery(evt); } else if (MESSAGE_BROADCAST.equals(cmd)) { // Broadcast消息处理 processMessageBroadcast(evt); } else { logger.debug("接收到消息:" + cmd); responseAck(evt, Response.OK); } } catch (DocumentException | SipException |InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /** * 处理MobilePosition移动位置消息 * * @param evt */ private void processMessageMobilePosition(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理MobilePosition移动位置消息时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } Element rootElement = getRootElement(evt, device.getCharset()); MobilePosition mobilePosition = new MobilePosition(); if (!StringUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); } mobilePosition.setDeviceId(deviceId); mobilePosition.setChannelId(getText(rootElement, "DeviceID")); mobilePosition.setTime(getText(rootElement, "Time")); mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed"))); } else { mobilePosition.setSpeed(0.0); } if (NumericUtil.isDouble(getText(rootElement, "Direction"))) { mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction"))); } else { mobilePosition.setDirection(0.0); } if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) { mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude"))); } else { mobilePosition.setAltitude(0.0); } mobilePosition.setReportSource("Mobile Position"); BaiduPoint bp = new BaiduPoint(); bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat()); mobilePosition.setGeodeticSystem("BD-09"); mobilePosition.setCnLng(bp.getBdLng()); mobilePosition.setCnLat(bp.getBdLat()); if (!userSetup.getSavePositionHistory()) { storager.clearMobilePositionsByDeviceId(deviceId); } storager.insertMobilePosition(mobilePosition); //回复 200 OK responseAck(evt, Response.OK); } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /** * 处理DeviceStatus设备状态Message * * @param evt */ private void processMessageDeviceStatus(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理DeviceStatus设备状态Message时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } Element rootElement = getRootElement(evt); String name = rootElement.getName(); Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText(); if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应,还是Query——查询请求 logger.info("接收到DeviceStatus查询消息"); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String platformId = ((SipUri) fromHeader.getAddress().getURI()).getUser(); if (platformId == null) { responseAck(evt, Response.NOT_FOUND); return; } else { // 回复200 OK responseAck(evt, Response.OK); String sn = rootElement.element("SN").getText(); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); cmderFroPlatform.deviceStatusResponse(parentPlatform, sn, fromHeader.getTag()); } } else { logger.info("接收到DeviceStatus应答消息"); // 检查设备是否存在, 不存在则不回复 if (storager.exists(deviceId)) { // 回复200 OK responseAck(evt, Response.OK); JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { logger.debug(json.toJSONString()); } RequestMessage msg = new RequestMessage(); msg.setKey(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + deviceId + channelId); msg.setData(json); deferredResultHolder.invokeAllResult(msg); if (offLineDetector.isOnline(deviceId)) { publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); } else { } } } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } } /** * 处理DeviceControl设备状态Message * * @param evt */ private void processMessageDeviceControl(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理DeviceControl设备状态Message未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } Element rootElement = getRootElement(evt); String channelId = getText(rootElement, "DeviceID"); //String result = getText(rootElement, "Result"); // 回复200 OK responseAck(evt, Response.OK); if (rootElement.getName().equals("Response")) {//} !StringUtils.isEmpty(result)) { // 此处是对本平台发出DeviceControl指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { logger.debug(json.toJSONString()); } RequestMessage msg = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + channelId; msg.setKey(key); msg.setData(json); deferredResultHolder.invokeAllResult(msg); } else { // 此处是上级发出的DeviceControl指令 String platformId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String targetGBId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); // 远程启动功能 if (!StringUtils.isEmpty(getText(rootElement, "TeleBoot"))) { if (deviceId.equals(targetGBId)) { // 远程启动本平台:需要在重新启动程序后先对SipStack解绑 logger.info("执行远程启动本平台命令"); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); cmderFroPlatform.unregister(parentPlatform, null, null); Thread restartThread = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(3000); SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider"); SipStackImpl stack = (SipStackImpl)up.getSipStack(); stack.stop(); Iterator listener = stack.getListeningPoints(); while (listener.hasNext()) { stack.deleteListeningPoint((ListeningPoint) listener.next()); } Iterator providers = stack.getSipProviders(); while (providers.hasNext()) { stack.deleteSipProvider((SipProvider) providers.next()); } VManageBootstrap.restart(); } catch (InterruptedException ignored) { } catch (ObjectInUseException e) { e.printStackTrace(); } } }); restartThread.setDaemon(false); restartThread.start(); } else { // 远程启动指定设备 } } // 云台/前端控制命令 if (!StringUtils.isEmpty(getText(rootElement,"PTZCmd")) && !deviceId.equals(targetGBId)) { String cmdString = getText(rootElement,"PTZCmd"); Device deviceForPlatform = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, deviceId); cmder.fronEndCmd(deviceForPlatform, deviceId, cmdString); } } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } } /** * 处理DeviceConfig设备状态Message * * @param evt */ private void processMessageDeviceConfig(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理DeviceConfig设备状态Message消息时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } Element rootElement = getRootElement(evt); String channelId = getText(rootElement, "DeviceID"); // 回复200 OK responseAck(evt, Response.OK); if (rootElement.getName().equals("Response")) { // 此处是对本平台发出DeviceControl指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { logger.debug(json.toJSONString()); } String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG + deviceId + channelId; RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(json); deferredResultHolder.invokeAllResult(msg); } else { // 此处是上级发出的DeviceConfig指令 } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } } /** * 处理ConfigDownload设备状态Message * * @param evt */ private void processMessageConfigDownload(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理ConfigDownload设备状态Message时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } Element rootElement = getRootElement(evt); String channelId = getText(rootElement, "DeviceID"); String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + deviceId + channelId; // 回复200 OK responseAck(evt, Response.OK); if (rootElement.getName().equals("Response")) { // 此处是对本平台发出DeviceControl指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { logger.debug(json.toJSONString()); } RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(json); deferredResultHolder.invokeAllResult(msg); } else { // 此处是上级发出的DeviceConfig指令 } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } } /** * 处理PresetQuery预置位列表Message * * @param evt */ private void processMessagePresetQuery(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理PresetQuery预置位列表Message时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } Element rootElement = getRootElement(evt); String channelId = getText(rootElement, "DeviceID"); String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId; // 回复200 OK responseAck(evt, Response.OK); if (rootElement.getName().equals("Response")) {// !StringUtils.isEmpty(result)) { // 此处是对本平台发出DeviceControl指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { logger.debug(json.toJSONString()); } RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(json); deferredResultHolder.invokeAllResult(msg); } else { // 此处是上级发出的DeviceControl指令 } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } } /** * 处理DeviceInfo设备信息Message * * @param evt */ private void processMessageDeviceInfo(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(deviceId); Element rootElement = getRootElement(evt); String requestName = rootElement.getName(); Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getTextTrim(); String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + deviceId + channelId; if (device != null ) { rootElement = getRootElement(evt, device.getCharset()); } if (requestName.equals("Query")) { logger.info("接收到DeviceInfo查询消息"); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); if (parentPlatform == null) { responseAck(evt, Response.NOT_FOUND); return; } else { // 回复200 OK responseAck(evt, Response.OK); String sn = rootElement.element("SN").getText(); cmderFroPlatform.deviceInfoResponse(parentPlatform, sn, fromHeader.getTag()); } } else { logger.debug("接收到DeviceInfo应答消息"); if (device == null) { logger.warn("处理DeviceInfo设备信息Message时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } device.setName(getText(rootElement, "DeviceName")); device.setManufacturer(getText(rootElement, "Manufacturer")); device.setModel(getText(rootElement, "Model")); device.setFirmware(getText(rootElement, "Firmware")); if (StringUtils.isEmpty(device.getStreamMode())) { device.setStreamMode("UDP"); } storager.updateDevice(device); RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(device); deferredResultHolder.invokeAllResult(msg); // 回复200 OK responseAck(evt, Response.OK); if (offLineDetector.isOnline(deviceId)) { publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); } } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /*** * 收到catalog设备目录列表请求 处理 * * @param evt */ private void processMessageCatalogList(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(deviceId); Element rootElement = getRootElement(evt); String name = rootElement.getName(); Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText(); Element deviceListElement = rootElement.element("DeviceList"); String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应,还是Query——查询请求 // TODO 后续将代码拆分 if (parentPlatform == null) { responseAck(evt, Response.NOT_FOUND); return; } else { // 回复200 OK responseAck(evt, Response.OK); Element snElement = rootElement.element("SN"); String sn = snElement.getText(); // 准备回复通道信息 List<ChannelReduce> channelReduces = storager.queryChannelListInParentPlatform(parentPlatform.getServerGBId()); // 查询关联的直播通道 List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId()); int size = channelReduces.size() + gbStreams.size(); // 回复级联的通道 if (channelReduces.size() > 0) { for (ChannelReduce channelReduce : channelReduces) { DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId()); cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); } } // 回复直播的通道 if (gbStreams.size() > 0) { for (GbStream gbStream : gbStreams) { DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setChannelId(gbStream.getGbId()); deviceChannel.setName(gbStream.getName()); deviceChannel.setLongitude(gbStream.getLongitude()); deviceChannel.setLatitude(gbStream.getLatitude()); deviceChannel.setDeviceId(parentPlatform.getDeviceGBId()); deviceChannel.setManufacture("wvp-pro"); deviceChannel.setStatus(gbStream.isStatus()?1:0); // deviceChannel.setParentId(parentPlatform.getDeviceGBId()); deviceChannel.setRegisterWay(1); deviceChannel.setCivilCode(config.getDomain()); deviceChannel.setModel("live"); deviceChannel.setOwner("wvp-pro"); // deviceChannel.setAddress("test"); deviceChannel.setParental(0); deviceChannel.setSecrecy("0"); deviceChannel.setSecrecy("0"); cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); } } if (size == 0) { // 回复无通道 cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size); } } } else { if (device == null) { logger.warn("收到catalog设备目录列表请求时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } deviceListElement = getRootElement(evt, device.getCharset()).element("DeviceList"); Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { // 遍历DeviceList while (deviceListIterator.hasNext()) { Element itemDevice = deviceListIterator.next(); Element channelDeviceElement = itemDevice.element("DeviceID"); if (channelDeviceElement == null) { continue; } String channelDeviceId = channelDeviceElement.getText(); Element channdelNameElement = itemDevice.element("Name"); String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : ""; Element statusElement = itemDevice.element("Status"); String status = statusElement != null ? statusElement.getText().toString() : "ON"; DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setName(channelName); deviceChannel.setChannelId(channelDeviceId); // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理 if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) { deviceChannel.setStatus(1); } if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) { deviceChannel.setStatus(0); } deviceChannel.setManufacture(getText(itemDevice, "Manufacturer")); deviceChannel.setModel(getText(itemDevice, "Model")); deviceChannel.setOwner(getText(itemDevice, "Owner")); deviceChannel.setCivilCode(getText(itemDevice, "CivilCode")); deviceChannel.setBlock(getText(itemDevice, "Block")); deviceChannel.setAddress(getText(itemDevice, "Address")); if (getText(itemDevice, "Parental") == null || getText(itemDevice, "Parental") == "") { deviceChannel.setParental(0); } else { deviceChannel.setParental(Integer.parseInt(getText(itemDevice, "Parental"))); } deviceChannel.setParentId(getText(itemDevice, "ParentID")); if (getText(itemDevice, "SafetyWay") == null || getText(itemDevice, "SafetyWay") == "") { deviceChannel.setSafetyWay(0); } else { deviceChannel.setSafetyWay(Integer.parseInt(getText(itemDevice, "SafetyWay"))); } if (getText(itemDevice, "RegisterWay") == null || getText(itemDevice, "RegisterWay") == "") { deviceChannel.setRegisterWay(1); } else { deviceChannel.setRegisterWay(Integer.parseInt(getText(itemDevice, "RegisterWay"))); } deviceChannel.setCertNum(getText(itemDevice, "CertNum")); if (getText(itemDevice, "Certifiable") == null || getText(itemDevice, "Certifiable") == "") { deviceChannel.setCertifiable(0); } else { deviceChannel.setCertifiable(Integer.parseInt(getText(itemDevice, "Certifiable"))); } if (getText(itemDevice, "ErrCode") == null || getText(itemDevice, "ErrCode") == "") { deviceChannel.setErrCode(0); } else { deviceChannel.setErrCode(Integer.parseInt(getText(itemDevice, "ErrCode"))); } deviceChannel.setEndTime(getText(itemDevice, "EndTime")); deviceChannel.setSecrecy(getText(itemDevice, "Secrecy")); deviceChannel.setIpAddress(getText(itemDevice, "IPAddress")); if (getText(itemDevice, "Port") == null || getText(itemDevice, "Port") == "") { deviceChannel.setPort(0); } else { deviceChannel.setPort(Integer.parseInt(getText(itemDevice, "Port"))); } deviceChannel.setPassword(getText(itemDevice, "Password")); if (NumericUtil.isDouble(getText(itemDevice, "Longitude"))) { deviceChannel.setLongitude(Double.parseDouble(getText(itemDevice, "Longitude"))); } else { deviceChannel.setLongitude(0.00); } if (NumericUtil.isDouble(getText(itemDevice, "Latitude"))) { deviceChannel.setLatitude(Double.parseDouble(getText(itemDevice, "Latitude"))); } else { deviceChannel.setLatitude(0.00); } if (getText(itemDevice, "PTZType") == null || getText(itemDevice, "PTZType") == "") { deviceChannel.setPTZType(0); } else { deviceChannel.setPTZType(Integer.parseInt(getText(itemDevice, "PTZType"))); } deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC storager.updateChannel(device.getDeviceId(), deviceChannel); } RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(device); deferredResultHolder.invokeAllResult(msg); // 回复200 OK responseAck(evt, Response.OK); if (offLineDetector.isOnline(deviceId)) { publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); } } } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /*** * 收到alarm设备报警信息 处理 * * @param evt */ private void processMessageAlarm(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理alarm设备报警信息未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } Element rootElement = getRootElement(evt, device.getCharset()); Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText().toString(); String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId + channelId; // 回复200 OK responseAck(evt, Response.OK); if (device.getCharset() != null) { rootElement = getRootElement(evt, device.getCharset()); } if (rootElement.getName().equals("Notify")) { // 处理报警通知 DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setDeviceId(deviceId); deviceAlarm.setChannelId(channelId); deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod")); deviceAlarm.setAlarmTime(getText(rootElement, "AlarmTime")); if (getText(rootElement, "AlarmDescription") == null) { deviceAlarm.setAlarmDescription(""); } else { deviceAlarm.setAlarmDescription(getText(rootElement, "AlarmDescription")); } if (NumericUtil.isDouble(getText(rootElement, "Longitude"))) { deviceAlarm.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); } else { deviceAlarm.setLongitude(0.00); } if (NumericUtil.isDouble(getText(rootElement, "Latitude"))) { deviceAlarm.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); } else { deviceAlarm.setLatitude(0.00); } if (!StringUtils.isEmpty(deviceAlarm.getAlarmMethod())) { if ( deviceAlarm.getAlarmMethod().equals("4")) { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); BaiduPoint bp = new BaiduPoint(); bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat()); mobilePosition.setGeodeticSystem("BD-09"); mobilePosition.setCnLng(bp.getBdLng()); mobilePosition.setCnLat(bp.getBdLat()); if (!userSetup.getSavePositionHistory()) { storager.clearMobilePositionsByDeviceId(deviceId); } storager.insertMobilePosition(mobilePosition); } } logger.debug("存储报警信息、报警分类"); // 存储报警信息、报警分类 deviceAlarmService.add(deviceAlarm); if (offLineDetector.isOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); } } else if (rootElement.getName().equals("Response")) { // 处理报警查询响应 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { logger.debug(json.toJSONString()); } RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(json); deferredResultHolder.invokeAllResult(msg); } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /*** * 收到keepalive请求 处理 * * @param evt */ private void processMessageKeepAlive(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); Element rootElement = getRootElement(evt); String channelId = getText(rootElement, "DeviceID"); // 检查设备是否存在并在线, 不在线则设置为在线 if (device != null ) { // 回复200 OK responseAck(evt, Response.OK); publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); }else{ logger.warn("收到[ "+deviceId+" ]心跳信息, 但是设备不存在, 回复404"); Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest()); ServerTransaction serverTransaction = getServerTransaction(evt); serverTransaction.sendResponse(response); if (serverTransaction.getDialog() != null) { serverTransaction.getDialog().delete(); } } // if (device != null && device.getOnline() == 1) { // // if (offLineDetector.isOnline(deviceId)) { // publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); // } else { // } // }else { //// logger.warn("收到[ "+deviceId+" ]心跳信息, 但是设备" + (device == null? "不存在":"离线") + ", 回复401"); //// Response response = getMessageFactory().createResponse(Response.UNAUTHORIZED, evt.getRequest()); //// getServerTransaction(evt).sendResponse(response); // publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); // // } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } } /*** * 处理RecordInfo设备录像列表Message请求 TODO 过期时间暂时写死180秒,后续与DeferredResult超时时间保持一致 * * @param evt */ private void processMessageRecordInfo(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理DeviceInfo设备信息Message时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } // 回复200 OK responseAck(evt, Response.OK); String uuid = UUID.randomUUID().toString().replace("-", ""); RecordInfo recordInfo = new RecordInfo(); Element rootElement = getRootElement(evt); Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText().toString(); String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + channelId; if (device != null ) { rootElement = getRootElement(evt, device.getCharset()); } recordInfo.setDeviceId(deviceId); recordInfo.setChannelId(channelId); recordInfo.setName(getText(rootElement, "Name")); if (getText(rootElement, "SumNum")== null || getText(rootElement, "SumNum") =="") { recordInfo.setSumNum(0); } else { recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum"))); } String sn = getText(rootElement, "SN"); Element recordListElement = rootElement.element("RecordList"); if (recordListElement == null || recordInfo.getSumNum() == 0) { logger.info("无录像数据"); RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(recordInfo); deferredResultHolder.invokeAllResult(msg); } else { Iterator<Element> recordListIterator = recordListElement.elementIterator(); List<RecordItem> recordList = new ArrayList<RecordItem>(); if (recordListIterator != null) { RecordItem record = new RecordItem(); logger.info("处理录像列表数据..."); // 遍历DeviceList while (recordListIterator.hasNext()) { Element itemRecord = recordListIterator.next(); Element recordElement = itemRecord.element("DeviceID"); if (recordElement == null) { logger.info("记录为空,下一个..."); continue; } record = new RecordItem(); record.setDeviceId(getText(itemRecord, "DeviceID")); record.setName(getText(itemRecord, "Name")); record.setFilePath(getText(itemRecord, "FilePath")); record.setAddress(getText(itemRecord, "Address")); record.setStartTime( DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "StartTime"))); record.setEndTime( DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "EndTime"))); record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 : Integer.parseInt(getText(itemRecord, "Secrecy"))); record.setType(getText(itemRecord, "Type")); record.setRecorderId(getText(itemRecord, "RecorderID")); recordList.add(record); } recordInfo.setRecordList(recordList); } // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题 String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; redis.set(cacheKey + "_" + uuid, recordList, 90); if (!threadNameList.contains(cacheKey)) { threadNameList.add(cacheKey); CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo); chk.setName(cacheKey); chk.setDeferredResultHolder(deferredResultHolder); chk.setRedis(redis); chk.setLogger(logger); chk.start(); if (logger.isDebugEnabled()) { logger.debug("Start Thread " + cacheKey + "."); } } else { if (logger.isDebugEnabled()) { logger.debug("Thread " + cacheKey + " already started."); } } // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回 // if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) { // // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分 // String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; // redis.set(cacheKey + "_" + uuid, recordList, 90); // List<Object> cacheKeys = redis.scan(cacheKey + "_*"); // List<RecordItem> totalRecordList = new ArrayList<RecordItem>(); // for (int i = 0; i < cacheKeys.size(); i++) { // totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString())); // } // if (totalRecordList.size() < recordInfo.getSumNum()) { // logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项"); // return; // } // logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项"); // recordInfo.setRecordList(totalRecordList); // for (int i = 0; i < cacheKeys.size(); i++) { // redis.del(cacheKeys.get(i).toString()); // } // } // // 自然顺序排序, 元素进行升序排列 // recordInfo.getRecordList().sort(Comparator.naturalOrder()); } // 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作 // 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作 // 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据 // RequestMessage msg = new RequestMessage(); // msg.setDeviceId(deviceId); // msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); // msg.setData(recordInfo); // deferredResultHolder.invokeResult(msg); // logger.info("处理完成,返回结果"); } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /** * 收到MediaStatus消息处理 * * @param evt */ private void processMessageMediaStatus(RequestEvent evt){ try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理DeviceInfo设备信息Message时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } // 回复200 OK responseAck(evt, Response.OK); Element rootElement = getRootElement(evt); String channelId = getText(rootElement, "DeviceID"); String NotifyType =getText(rootElement, "NotifyType"); if (NotifyType.equals("121")){ logger.info("媒体播放完毕,通知关流"); StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*"); if (streamInfo != null) { redisCatchStorage.stopPlayback(streamInfo); cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId()); } } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } } /** * 处理AudioBroadcast语音广播Message * * @param evt */ private void processMessageBroadcast(RequestEvent evt) { try { String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("处理DeviceInfo设备信息Message时未找到设备信息"); responseAck(evt, Response.NOT_FOUND); return; } Element rootElement = getRootElement(evt); String channelId = getText(rootElement, "DeviceID"); String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId + channelId; // 回复200 OK responseAck(evt, Response.OK); if (rootElement.getName().equals("Response")) { // 此处是对本平台发出Broadcast指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { logger.debug(json.toJSONString()); } RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(json); deferredResultHolder.invokeAllResult(msg); } else { // 此处是上级发出的Broadcast指令 } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
New file @@ -0,0 +1,384 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.utils.GpsUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.Iterator; /** * @description: Notify请求处理器 * @author: lawrencehj * @date: 2021年1月27日 */ @Component public class NotifyRequestProcessor extends SIPRequestProcessorAbstract { private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class); @Autowired private UserSetup userSetup; @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private EventPublisher publisher; @Autowired private DeviceOffLineDetector offLineDetector; private static final String NOTIFY_CATALOG = "Catalog"; private static final String NOTIFY_ALARM = "Alarm"; private static final String NOTIFY_MOBILE_POSITION = "MobilePosition"; private String method = "NOTIFY"; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } @Override public void process(RequestEvent evt) { try { Element rootElement = getRootElement(evt); String cmd = XmlUtil.getText(rootElement, "CmdType"); if (NOTIFY_CATALOG.equals(cmd)) { logger.info("接收到Catalog通知"); processNotifyCatalogList(evt); } else if (NOTIFY_ALARM.equals(cmd)) { logger.info("接收到Alarm通知"); processNotifyAlarm(evt); } else if (NOTIFY_MOBILE_POSITION.equals(cmd)) { logger.info("接收到MobilePosition通知"); processNotifyMobilePosition(evt); } else { logger.info("接收到消息:" + cmd); responseAck(evt, Response.OK); } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /** * 处理MobilePosition移动位置Notify * * @param evt */ private void processNotifyMobilePosition(RequestEvent evt) { try { // 回复 200 OK Element rootElement = getRootElement(evt); MobilePosition mobilePosition = new MobilePosition(); Element deviceIdElement = rootElement.element("DeviceID"); String deviceId = deviceIdElement.getTextTrim().toString(); Device device = storager.queryVideoDevice(deviceId); if (device != null) { if (!StringUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); } } mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID")); mobilePosition.setTime(XmlUtil.getText(rootElement, "Time")); mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); } else { mobilePosition.setSpeed(0.0); } if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); } else { mobilePosition.setDirection(0.0); } if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); } else { mobilePosition.setAltitude(0.0); } mobilePosition.setReportSource("Mobile Position"); BaiduPoint bp = new BaiduPoint(); bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat()); mobilePosition.setGeodeticSystem("BD-09"); mobilePosition.setCnLng(bp.getBdLng()); mobilePosition.setCnLat(bp.getBdLat()); if (!userSetup.getSavePositionHistory()) { storager.clearMobilePositionsByDeviceId(deviceId); } storager.insertMobilePosition(mobilePosition); responseAck(evt, Response.OK); } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /*** * 处理alarm设备报警Notify * * @param evt */ private void processNotifyAlarm(RequestEvent evt) { try { Element rootElement = getRootElement(evt); Element deviceIdElement = rootElement.element("DeviceID"); String deviceId = deviceIdElement.getText().toString(); Device device = storager.queryVideoDevice(deviceId); if (device == null) { return; } rootElement = getRootElement(evt, device.getCharset()); DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setDeviceId(deviceId); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); deviceAlarm.setAlarmTime(XmlUtil.getText(rootElement, "AlarmTime")); if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { deviceAlarm.setAlarmDescription(""); } else { deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); } if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); } else { deviceAlarm.setLongitude(0.00); } if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); } else { deviceAlarm.setLatitude(0.00); } if (deviceAlarm.getAlarmMethod().equals("4")) { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); BaiduPoint bp = new BaiduPoint(); bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat()); mobilePosition.setGeodeticSystem("BD-09"); mobilePosition.setCnLng(bp.getBdLng()); mobilePosition.setCnLat(bp.getBdLat()); if (!userSetup.getSavePositionHistory()) { storager.clearMobilePositionsByDeviceId(deviceId); } storager.insertMobilePosition(mobilePosition); } // TODO: 需要实现存储报警信息、报警分类 // 回复200 OK responseAck(evt, Response.OK); if (offLineDetector.isOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } /*** * 处理catalog设备目录列表Notify * * @param evt */ private void processNotifyCatalogList(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); Element rootElement = getRootElement(evt); Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText(); Device device = storager.queryVideoDevice(deviceId); if (device == null) { return; } if (device != null ) { rootElement = getRootElement(evt, device.getCharset()); } Element deviceListElement = rootElement.element("DeviceList"); if (deviceListElement == null) { return; } Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { // 遍历DeviceList while (deviceListIterator.hasNext()) { Element itemDevice = deviceListIterator.next(); Element channelDeviceElement = itemDevice.element("DeviceID"); if (channelDeviceElement == null) { continue; } String channelDeviceId = channelDeviceElement.getTextTrim(); Element channdelNameElement = itemDevice.element("Name"); String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : ""; Element statusElement = itemDevice.element("Status"); String status = statusElement != null ? statusElement.getTextTrim().toString() : "ON"; DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setName(channelName); deviceChannel.setChannelId(channelDeviceId); // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理 if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) { deviceChannel.setStatus(1); } if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) { deviceChannel.setStatus(0); } deviceChannel.setManufacture(XmlUtil.getText(itemDevice, "Manufacturer")); deviceChannel.setModel(XmlUtil.getText(itemDevice, "Model")); deviceChannel.setOwner(XmlUtil.getText(itemDevice, "Owner")); deviceChannel.setCivilCode(XmlUtil.getText(itemDevice, "CivilCode")); deviceChannel.setBlock(XmlUtil.getText(itemDevice, "Block")); deviceChannel.setAddress(XmlUtil.getText(itemDevice, "Address")); if (XmlUtil.getText(itemDevice, "Parental") == null || XmlUtil.getText(itemDevice, "Parental") == "") { deviceChannel.setParental(0); } else { deviceChannel.setParental(Integer.parseInt(XmlUtil.getText(itemDevice, "Parental"))); } deviceChannel.setParentId(XmlUtil.getText(itemDevice, "ParentID")); if (XmlUtil.getText(itemDevice, "SafetyWay") == null || XmlUtil.getText(itemDevice, "SafetyWay") == "") { deviceChannel.setSafetyWay(0); } else { deviceChannel.setSafetyWay(Integer.parseInt(XmlUtil.getText(itemDevice, "SafetyWay"))); } if (XmlUtil.getText(itemDevice, "RegisterWay") == null || XmlUtil.getText(itemDevice, "RegisterWay") == "") { deviceChannel.setRegisterWay(1); } else { deviceChannel.setRegisterWay(Integer.parseInt(XmlUtil.getText(itemDevice, "RegisterWay"))); } deviceChannel.setCertNum(XmlUtil.getText(itemDevice, "CertNum")); if (XmlUtil.getText(itemDevice, "Certifiable") == null || XmlUtil.getText(itemDevice, "Certifiable") == "") { deviceChannel.setCertifiable(0); } else { deviceChannel.setCertifiable(Integer.parseInt(XmlUtil.getText(itemDevice, "Certifiable"))); } if (XmlUtil.getText(itemDevice, "ErrCode") == null || XmlUtil.getText(itemDevice, "ErrCode") == "") { deviceChannel.setErrCode(0); } else { deviceChannel.setErrCode(Integer.parseInt(XmlUtil.getText(itemDevice, "ErrCode"))); } deviceChannel.setEndTime(XmlUtil.getText(itemDevice, "EndTime")); deviceChannel.setSecrecy(XmlUtil.getText(itemDevice, "Secrecy")); deviceChannel.setIpAddress(XmlUtil.getText(itemDevice, "IPAddress")); if (XmlUtil.getText(itemDevice, "Port") == null || XmlUtil.getText(itemDevice, "Port") == "") { deviceChannel.setPort(0); } else { deviceChannel.setPort(Integer.parseInt(XmlUtil.getText(itemDevice, "Port"))); } deviceChannel.setPassword(XmlUtil.getText(itemDevice, "Password")); if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) { deviceChannel.setLongitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Longitude"))); } else { deviceChannel.setLongitude(0.00); } if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Latitude"))) { deviceChannel.setLatitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Latitude"))); } else { deviceChannel.setLatitude(0.00); } if (XmlUtil.getText(itemDevice, "PTZType") == null || XmlUtil.getText(itemDevice, "PTZType") == "") { deviceChannel.setPTZType(0); } else { deviceChannel.setPTZType(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType"))); } deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC storager.updateChannel(device.getDeviceId(), deviceChannel); } // RequestMessage msg = new RequestMessage(); // msg.setDeviceId(deviceId); // msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG); // msg.setData(device); // deferredResultHolder.invokeResult(msg); // 回复200 OK responseAck(evt, Response.OK); if (offLineDetector.isOnline(deviceId)) { publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); } } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } public void setCmder(SIPCommander cmder) { } public void setStorager(IVideoManagerStorager storager) { this.storager = storager; } public void setPublisher(EventPublisher publisher) { this.publisher = publisher; } public void setRedis(RedisUtil redis) { } public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { } public void setOffLineDetector(DeviceOffLineDetector offLineDetector) { this.offLineDetector = offLineDetector; } public IRedisCatchStorage getRedisCatchStorage() { return redisCatchStorage; } public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
New file @@ -0,0 +1,197 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper; import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import gov.nist.javax.sip.RequestEventExt; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.Expires; import gov.nist.javax.sip.header.SIPDateHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.header.*; import javax.sip.message.Request; import javax.sip.message.Response; import java.security.NoSuchAlgorithmException; import java.text.ParseException; import java.util.Calendar; import java.util.Locale; /** * @description:收到注册请求 处理 * @author: swwheihei * @date: 2020年5月3日 下午4:47:25 */ @Component public class RegisterRequestProcessor extends SIPRequestProcessorAbstract { private Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class); public String method = "REGISTER"; @Autowired private SipConfig sipConfig; @Autowired private RegisterLogicHandler handler; @Autowired private IVideoManagerStorager storager; @Autowired private EventPublisher publisher; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } /** * 收到注册请求 处理 * @param evt */ @Override public void process(RequestEvent evt) { try { RequestEventExt evtExt = (RequestEventExt)evt; String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort(); logger.info("[{}] 收到注册请求,开始处理", requestAddress); Request request = evt.getRequest(); Response response = null; boolean passwordCorrect = false; // 注册标志 0:未携带授权头或者密码错误 1:注册成功 2:注销成功 int registerFlag = 0; FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME); AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); String deviceId = uri.getUser(); Device device = storager.queryVideoDevice(deviceId); AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); // 校验密码是否正确 if (authorhead != null) { passwordCorrect = new DigestServerAuthenticationHelper().doAuthenticatePlainTextPassword(request, sipConfig.getPassword()); } if (StringUtils.isEmpty(sipConfig.getPassword())){ passwordCorrect = true; } // 未携带授权头或者密码错误 回复401 if (authorhead == null ) { logger.info("[{}] 未携带授权头 回复401", requestAddress); response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain()); }else { if (!passwordCorrect){ // 注册失败 response = getMessageFactory().createResponse(Response.FORBIDDEN, request); response.setReasonPhrase("wrong password"); logger.info("[{}] 密码/SIP服务器ID错误, 回复403", requestAddress); }else { // 携带授权头并且密码正确 response = getMessageFactory().createResponse(Response.OK, request); // 添加date头 SIPDateHeader dateHeader = new SIPDateHeader(); // 使用自己修改的 WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); dateHeader.setDate(wvpSipDate); response.addHeader(dateHeader); ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME); if (expiresHeader == null) { response = getMessageFactory().createResponse(Response.BAD_REQUEST, request); ServerTransaction serverTransaction = getServerTransaction(evt); serverTransaction.sendResponse(response); if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); return; } // 添加Contact头 response.addHeader(request.getHeader(ContactHeader.NAME)); // 添加Expires头 response.addHeader(request.getExpires()); // 获取到通信地址等信息 ViaHeader viaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); String received = viaHeader.getReceived(); int rPort = viaHeader.getRPort(); // 解析本地地址替代 if (StringUtils.isEmpty(received) || rPort == -1) { received = viaHeader.getHost(); rPort = viaHeader.getPort(); } // if (device == null) { device = new Device(); device.setStreamMode("UDP"); device.setCharset("gb2312"); device.setDeviceId(deviceId); device.setFirsRegister(true); } device.setIp(received); device.setPort(rPort); device.setHostAddress(received.concat(":").concat(String.valueOf(rPort))); // 注销成功 if (expiresHeader.getExpires() == 0) { registerFlag = 2; } // 注册成功 else { device.setExpires(expiresHeader.getExpires()); registerFlag = 1; // 判断TCP还是UDP boolean isTcp = false; ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); String transport = reqViaHeader.getTransport(); if (transport.equals("TCP")) { isTcp = true; } device.setTransport(isTcp ? "TCP" : "UDP"); } } } ServerTransaction serverTransaction = getServerTransaction(evt); serverTransaction.sendResponse(response); if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); // 注册成功 // 保存到redis // 下发catelog查询目录 if (registerFlag == 1 ) { logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress); publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER); // 重新注册更新设备和通道,以免设备替换或更新后信息无法更新 handler.onRegister(device); } else if (registerFlag == 2) { logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress); publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER); } } catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) { e.printStackTrace(); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
New file @@ -0,0 +1,75 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.header.ExpiresHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; /** * @description:SUBSCRIBE请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:31:20 */ @Component public class SubscribeRequestProcessor extends SIPRequestProcessorAbstract { private Logger logger = LoggerFactory.getLogger(SubscribeRequestProcessor.class); private String method = "SUBSCRIBE"; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addRequestProcessor(method, this); } /** * 处理SUBSCRIBE请求 * * @param evt */ @Override public void process(RequestEvent evt) { Request request = evt.getRequest(); try { Response response = null; response = getMessageFactory().createResponse(200, request); if (response != null) { ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30); response.setExpires(expireHeader); } logger.info("response : " + response.toString()); ServerTransaction transaction = getServerTransaction(evt); if (transaction != null) { transaction.sendResponse(response); transaction.getDialog().delete(); transaction.terminate(); } else { logger.info("processRequest serverTransactionId is null."); } } catch (ParseException e) { e.printStackTrace(); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java
New file @@ -0,0 +1,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response; import javax.sip.ResponseEvent; /** * @description:处理接收IPCamera发来的SIP协议响应消息 * @author: swwheihei * @date: 2020年5月3日 下午4:42:22 */ public interface ISIPResponseProcessor { void process(ResponseEvent evt); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java
New file @@ -0,0 +1,49 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.ResponseEvent; /** * @description: BYE请求响应器 * @author: swwheihei * @date: 2020年5月3日 下午5:32:05 */ @Component public class ByeResponseProcessor extends SIPResponseProcessorAbstract { private String method = "BYE"; @Autowired private SipLayer sipLayer; @Autowired private SipConfig config; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addResponseProcessor(method, this); } /** * 处理BYE响应 * * @param evt */ @Override public void process(ResponseEvent evt) { // TODO Auto-generated method stub System.out.println("收到bye"); } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java
New file @@ -0,0 +1,47 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.ResponseEvent; /** * @description: CANCEL响应处理器 * @author: panlinlin * @date: 2021年11月5日 16:35 */ @Component public class CancelResponseProcessor extends SIPResponseProcessorAbstract { private String method = "CANCEL"; @Autowired private SipLayer sipLayer; @Autowired private SipConfig config; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addResponseProcessor(method, this); } /** * 处理CANCEL响应 * * @param evt */ @Override public void process(ResponseEvent evt) { // TODO Auto-generated method stub } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
New file @@ -0,0 +1,97 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; import gov.nist.javax.sip.ResponseEventExt; import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CSeqHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; /** * @description: 处理INVITE响应 * @author: panlinlin * @date: 2021年11月5日 16:40 */ @Component public class InviteResponseProcessor extends SIPResponseProcessorAbstract { private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class); private String method = "INVITE"; @Autowired private SipLayer sipLayer; @Autowired private SipConfig config; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addResponseProcessor(method, this); } @Autowired private VideoStreamSessionManager streamSession; /** * 处理invite响应 * * @param evt 响应消息 * @throws ParseException */ @Override public void process(ResponseEvent evt ){ try { Response response = evt.getResponse(); int statusCode = response.getStatusCode(); // trying不会回复 if (statusCode == Response.TRYING) { } // 成功响应 // 下发ack if (statusCode == Response.OK) { ResponseEventExt event = (ResponseEventExt)evt; SIPDialog dialog = (SIPDialog)evt.getDialog(); CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME); Request reqAck = dialog.createAck(cseq.getSeqNumber()); SipURI requestURI = (SipURI) reqAck.getRequestURI(); try { requestURI.setHost(event.getRemoteIpAddress()); } catch (ParseException e) { e.printStackTrace(); } requestURI.setPort(event.getRemotePort()); reqAck.setRequestURI(requestURI); logger.info("向 " + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack"); SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI(); String deviceId = requestURI.getUser(); String channelId = sipURI.getUser(); dialog.sendAck(reqAck); } } catch (InvalidArgumentException | SipException e) { e.printStackTrace(); } } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
New file @@ -0,0 +1,104 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.ResponseEvent; import javax.sip.header.CallIdHeader; import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Response; /** * @description:Register响应处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:32:23 */ @Component public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class); private String method = "REGISTER"; @Autowired private ISIPCommanderForPlatform sipCommanderForPlatform; @Autowired private IVideoManagerStorager storager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private SIPProcessorObserver sipProcessorObserver; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 sipProcessorObserver.addResponseProcessor(method, this); } /** * 处理Register响应 * * @param evt 事件 */ @Override public void process(ResponseEvent evt) { Response response = evt.getResponse(); CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME); String callId = callIdHeader.getCallId(); String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId); if (platformGBId == null) { logger.info(String.format("未找到callId: %s 的注册/注销平台id", callId )); return; } ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId); if (parentPlatformCatch == null) { logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台缓存信息未查询到!!!", platformGBId, response.getStatusCode())); return; } String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册"; logger.info(String.format("收到 %s %s的%S响应", platformGBId, action, response.getStatusCode() )); ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform(); if (parentPlatform == null) { logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode())); return; } if (response.getStatusCode() == 401) { WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); sipCommanderForPlatform.register(parentPlatform, callId, www, null, null); }else if (response.getStatusCode() == 200){ // 注册/注销成功 logger.info(String.format("%s %s成功", platformGBId, action)); redisCatchStorage.delPlatformRegisterInfo(callId); parentPlatform.setStatus("注册".equals(action)); // 取回Expires设置,避免注销过程中被置为0 ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); String expires = parentPlatformTmp.getExpires(); parentPlatform.setExpires(expires); parentPlatform.setId(parentPlatformTmp.getId()); storager.updateParentPlatformStatus(platformGBId, "注册".equals(action)); redisCatchStorage.updatePlatformRegister(parentPlatform); redisCatchStorage.updatePlatformKeepalive(parentPlatform); parentPlatformCatch.setParentPlatform(parentPlatform); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } } } src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
New file @@ -0,0 +1,24 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.Device; /** * 设备相关业务处理 */ public interface IDeviceService { /** * 添加目录订阅 * @param device 设备信息 * @return */ boolean addCatalogSubscribe(Device device); /** * 移除目录订阅 * @param device 设备信息 * @return */ boolean removeCatalogSubscribe(Device device); } src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java
New file @@ -0,0 +1,48 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.sip.ResponseEvent; public class CatalogSubscribeTask implements Runnable{ private final Logger logger = LoggerFactory.getLogger(CatalogSubscribeTask.class); private Device device; private ISIPCommander sipCommander; public CatalogSubscribeTask(Device device, ISIPCommander sipCommander) { this.device = device; this.sipCommander = sipCommander; } @Override public void run() { sipCommander.catalogSubscribe(device, eventResult -> { ResponseEvent event = (ResponseEvent) eventResult.event; Element rootElement = null; try { rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312"); } catch (DocumentException e) { e.printStackTrace(); } Element resultElement = rootElement.element("Result"); String result = resultElement.getText(); if (result.toUpperCase().equals("OK")){ // 成功 logger.info("目录订阅成功: {}", device.getDeviceId()); }else { // 失败 logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result); } },eventResult -> { // 失败 logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId()); }); } } src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
New file @@ -0,0 +1,61 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.bean.CatalogSubscribeTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class DeviceServiceImpl implements IDeviceService { private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class); @Autowired private DynamicTask dynamicTask; ; @Autowired private ISIPCommander sipCommander; @Override public boolean addCatalogSubscribe(Device device) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } // 添加目录订阅 CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander); catalogSubscribeTask.run(); // 提前开始刷新订阅 String cron = getCron(device.getSubscribeCycleForCatalog() - 60); dynamicTask.startCron(device.getDeviceId(), catalogSubscribeTask, cron); return true; } @Override public boolean removeCatalogSubscribe(Device device) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } dynamicTask.stopCron(device.getDeviceId()); return true; } public String getCron(int time) { if (time <= 59) { return "0/" + time +" * * * * ?"; }else if (time <= 60* 59) { int minute = time/(60); return "0 0/" + minute +" * * * ?"; }else if (time <= 60* 60* 59) { int hour = time/(60*60); return "0 0 0/" + hour +" * * ?"; }else { return "0 0/10 * * * ?"; } } }