From bc0319b3f338412aa18f73bd749057e9ea3a7125 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 13 十二月 2021 17:20:23 +0800 Subject: [PATCH] 将device信息写入redis以提高sip处理速度 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 227 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 182 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index cec64cc..138ccaf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1,25 +1,24 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; -import java.lang.reflect.Field; -import java.text.ParseException; -import java.util.HashSet; - -import javax.sip.*; -import javax.sip.address.SipURI; -import javax.sip.header.CallIdHeader; -import javax.sip.header.ViaHeader; -import javax.sip.message.Request; - import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; -import com.genersoft.iot.vmp.media.zlm.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; +import com.genersoft.iot.vmp.gb28181.utils.DateUtil; +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.message.SIPRequest; @@ -29,20 +28,20 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.DependsOn; -import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; - -import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; -import com.genersoft.iot.vmp.gb28181.utils.DateUtil; -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import org.springframework.util.StringUtils; +import javax.sip.*; +import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; +import javax.sip.header.ViaHeader; +import javax.sip.message.Request; +import java.lang.reflect.Field; +import java.text.ParseException; +import java.util.HashSet; + /** - * @Description:璁惧鑳藉姏鎺ュ彛锛岀敤浜庡畾涔夎澶囩殑鎺у埗銆佹煡璇㈣兘鍔� + * @description:璁惧鑳藉姏鎺ュ彛锛岀敤浜庡畾涔夎澶囩殑鎺у埗銆佹煡璇㈣兘鍔� * @author: swwheihei * @date: 2020骞�5鏈�3鏃� 涓嬪崍9:22:48 */ @@ -55,12 +54,10 @@ @Autowired private SipConfig sipConfig; - @Lazy @Autowired @Qualifier(value="tcpSipProvider") private SipProviderImpl tcpSipProvider; - @Lazy @Autowired @Qualifier(value="udpSipProvider") private SipProviderImpl udpSipProvider; @@ -89,11 +86,6 @@ @Autowired private IMediaServerService mediaServerService; - private SIPDialog dialog; - - public SipConfig getSipConfig() { - return sipConfig; - } /** * 浜戝彴鏂瑰悜鏀炬帶鍒讹紝浣跨敤閰嶇疆鏂囦欢涓殑榛樿闀滃ご绉诲姩閫熷害 @@ -361,7 +353,7 @@ // StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o="+"00000"+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n"); + content.append("o="+ sipConfig.getId()+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n"); content.append("t=0 0\r\n"); @@ -427,8 +419,8 @@ mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); errorEvent.response(e); }), e ->{ - streamSession.put(device.getDeviceId(), channelId ,ssrcInfo.getSsrc(), finalStreamId, mediaServerItem.getId(),e.getClientTransaction()); - streamSession.put(device.getDeviceId(), channelId , e.getDialog()); + streamSession.put(device.getDeviceId(), channelId ,ssrcInfo.getSsrc(), finalStreamId, mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction()); + streamSession.put(device.getDeviceId(), channelId , e.dialog); }); @@ -468,7 +460,7 @@ StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o="+sipConfig.getId()+" 0 0 IN IP4 "+sipConfig.getIp()+"\r\n"); + content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("s=Playback\r\n"); content.append("u="+channelId+":0\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); @@ -532,12 +524,12 @@ CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); transmitRequest(device, request, errorEvent, okEvent -> { - Dialog dialog = okEvent.getClientTransaction().getDialog(); - streamSession.put(device.getDeviceId(), channelId, ssrcInfo.getSsrc(), ssrcInfo.getStreamId(), mediaServerItem.getId(), okEvent.getClientTransaction()); - streamSession.put(device.getDeviceId(), channelId, dialog); + ResponseEvent responseEvent = (ResponseEvent) okEvent.event; + streamSession.put(device.getDeviceId(), channelId, ssrcInfo.getSsrc(), ssrcInfo.getStreamId(), mediaServerItem.getId(), responseEvent.getClientTransaction()); + streamSession.put(device.getDeviceId(), channelId, okEvent.dialog); }); } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -575,7 +567,7 @@ StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o="+sipConfig.getId()+" 0 0 IN IP4 "+sipConfig.getIp()+"\r\n"); + content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("s=Download\r\n"); content.append("u="+channelId+":0\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); @@ -640,7 +632,7 @@ CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); ClientTransaction transaction = transmitRequest(device, request, errorEvent); streamSession.put(device.getDeviceId(), channelId, ssrcInfo.getSsrc(), ssrcInfo.getStreamId(), mediaServerItem.getId(), transaction); @@ -667,6 +659,10 @@ ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId); if (transaction == null) { logger.warn("[ {} -> {}]鍋滄瑙嗛娴佺殑鏃跺�欏彂鐜颁簨鍔″凡涓㈠け", deviceId, channelId); + SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>(); + if (okEvent != null) { + okEvent.response(eventResult); + } return; } SIPDialog dialog = streamSession.getDialog(deviceId, channelId); @@ -1167,7 +1163,7 @@ @Override public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) { // 娓呯┖閫氶亾 - storager.cleanChannelsForDevice(device.getDeviceId()); +// storager.cleanChannelsForDevice(device.getDeviceId()); try { StringBuffer catalogXml = new StringBuffer(200); catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n"); @@ -1200,14 +1196,15 @@ * @param endTime 缁撴潫鏃堕棿,鏍煎紡瑕佹眰锛歽yyy-MM-dd HH:mm:ss */ @Override - public boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime) { - + public boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, SipSubscribe.Event errorEvent) { + + try { StringBuffer recordInfoXml = new StringBuffer(200); recordInfoXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n"); recordInfoXml.append("<Query>\r\n"); recordInfoXml.append("<CmdType>RecordInfo</CmdType>\r\n"); - recordInfoXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n"); + recordInfoXml.append("<SN>" + sn + "</SN>\r\n"); recordInfoXml.append("<DeviceID>" + channelId + "</DeviceID>\r\n"); recordInfoXml.append("<StartTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(startTime) + "</StartTime>\r\n"); recordInfoXml.append("<EndTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(endTime) + "</EndTime>\r\n"); @@ -1224,7 +1221,7 @@ Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "z9hG4bK-ViaRecordInfo-" + tm, "fromRec" + tm, null, callIdHeader); - transmitRequest(device, request); + transmitRequest(device, request, errorEvent); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); return false; @@ -1486,6 +1483,33 @@ } } + @Override + public boolean catalogSubscribe(Device device, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { + try { + StringBuffer cmdXml = new StringBuffer(200); + cmdXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n"); + cmdXml.append("<Query>\r\n"); + cmdXml.append("<CmdType>Catalog</CmdType>\r\n"); + cmdXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n"); + cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n"); + cmdXml.append("</Query>\r\n"); + + String tm = Long.toString(System.currentTimeMillis()); + + CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + + Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader); + transmitRequest(device, request, errorEvent, okEvent); + + return true; + + } catch ( NumberFormatException | ParseException | InvalidArgumentException | SipException e) { + e.printStackTrace(); + return false; + } + } + private ClientTransaction transmitRequest(Device device, Request request) throws SipException { return transmitRequest(device, request, null, null); @@ -1506,14 +1530,127 @@ CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); // 娣诲姞閿欒璁㈤槄 if (errorEvent != null) { - sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), errorEvent); + sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> { + errorEvent.response(eventResult); + sipSubscribe.removeErrorSubscribe(eventResult.callId); + })); } // 娣诲姞璁㈤槄 if (okEvent != null) { - sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); + sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult ->{ + okEvent.response(eventResult); + sipSubscribe.removeOkSubscribe(eventResult.callId); + }); } clientTransaction.sendRequest(); return clientTransaction; } + + /** + * 鍥炴斁鏆傚仠 + */ + @Override + public void playPauseCmd(Device device, StreamInfo streamInfo) { + try { + + StringBuffer content = new StringBuffer(200); + content.append("PAUSE RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("PauseTime: now\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + if (clientTransaction != null) { + clientTransaction.sendRequest(); + } + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 鍥炴斁鎭㈠ + */ + @Override + public void playResumeCmd(Device device, StreamInfo streamInfo) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Range: npt=now-\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 鍥炴斁鎷栧姩鎾斁 + */ + @Override + public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n"); + + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 鍥炴斁鍊嶉�熸挱鏀� + */ + @Override + public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Scale: " + String.format("%.1f",speed) + "\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } } -- Gitblit v1.8.0