648540858
2022-09-24 d7a1b94f905c5f28c9c8f2d48c3f9e28ebcf9cc4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -28,6 +29,8 @@
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.*;
@@ -45,7 +48,7 @@
    private final Logger logger = LoggerFactory.getLogger(SIPCommanderFroPlatform.class);
    @Autowired
    private SIPRequestHeaderPlarformProvider headerProviderPlarformProvider;
    private SIPRequestHeaderPlarformProvider headerProviderPlatformProvider;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
@@ -72,6 +75,9 @@
    @Autowired
    private SipFactory sipFactory;
    @Autowired
    private SubscribeHolder subscribeHolder;
    @Override
    public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
        return register(parentPlatform, null, null, errorEvent, okEvent, false, true);
@@ -96,7 +102,7 @@
                    callIdHeader = udpSipProvider.getNewCallId();
                }
                request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform,
                request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform,
                        redisCatchStorage.getCSEQ(), SipUtils.getNewFromTag(),
                        SipUtils.getNewViaTag(), callIdHeader, isRegister);
                // 将 callid 写入缓存, 等注册成功可以更新状态
@@ -118,7 +124,7 @@
            }else {
                CallIdHeader callIdHeader = parentPlatform.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
                        : udpSipProvider.getNewCallId();
                request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, SipUtils.getNewFromTag(), null, callId, www, callIdHeader, isRegister);
                request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, SipUtils.getNewFromTag(), null, callId, www, callIdHeader, isRegister);
            }
            transmitRequest(parentPlatform, request, null, okEvent);
@@ -152,7 +158,7 @@
            CallIdHeader callIdHeader = parentPlatform.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request = headerProviderPlarformProvider.createMessageRequest(
            Request request = headerProviderPlatformProvider.createMessageRequest(
                    parentPlatform,
                    keepaliveXml.toString(),
                    SipUtils.getNewFromTag(),
@@ -218,7 +224,7 @@
            CallIdHeader callIdHeader = parentPlatform.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, catalogXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader);
            Request request = headerProviderPlatformProvider.createMessageRequest(parentPlatform, catalogXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader);
            transmitRequest(parentPlatform, request);
        } catch (SipException | ParseException | InvalidArgumentException e) {
@@ -258,6 +264,7 @@
                if (channel.getParentId() != null) {
                    // 业务分组加上这一项即可,提高兼容性,
                    catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n");
//                    catalogXml.append("<ParentID>" + parentPlatform.getDeviceGBId() + "/" + channel.getParentId() + "</ParentID>\r\n");
                }
                if (channel.getChannelId().length() == 20 && Integer.parseInt(channel.getChannelId().substring(10, 13)) == 216) {
                    // 虚拟组织增加BusinessGroupID字段
@@ -268,21 +275,23 @@
                    if (channel.getParental() == 0) {
                        catalogXml.append("<Status>" + (channel.getStatus() == 0 ? "OFF" : "ON") + "</Status>\r\n");
                    }
                }
                if (channel.getParental() == 0) {
                    // 通道项
                    catalogXml.append("<Manufacturer>" + channel.getManufacture() + "</Manufacturer>\r\n");
                    catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
                    catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
                    String civilCode = channel.getCivilCode() == null?parentPlatform.getAdministrativeDivision() : channel.getCivilCode();
                    if (channel.getChannelType() != 2) {  // 业务分组/虚拟组织/行政区划 不设置以下属性
                        catalogXml.append("<Model>" + channel.getModel() + "</Model>\r\n");
                        catalogXml.append("<Owner> " + channel.getOwner()+ "</Owner>\r\n");
                        catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n");
                        catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
                        catalogXml.append("<Owner>" + parentPlatform.getDeviceGBId()+ "</Owner>\r\n");
                        catalogXml.append("<CivilCode>" + civilCode + "</CivilCode>\r\n");
                        if (channel.getAddress() == null) {
                            catalogXml.append("<Address></Address>\r\n");
                        }else {
                            catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
                        }
                    }
                }
                catalogXml.append("</Item>\r\n");
            }
@@ -309,7 +318,7 @@
            CallIdHeader callIdHeader = parentPlatform.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, catalogXml, fromTag, SipUtils.getNewViaTag(), callIdHeader);
            Request request = headerProviderPlatformProvider.createMessageRequest(parentPlatform, catalogXml, fromTag, SipUtils.getNewViaTag(), callIdHeader);
            transmitRequest(parentPlatform, request, null, eventResult -> {
                int indexNext = index + parentPlatform.getCatalogGroup();
                sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext);
@@ -349,7 +358,7 @@
            CallIdHeader callIdHeader = parentPlatform.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, deviceInfoXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader);
            Request request = headerProviderPlatformProvider.createMessageRequest(parentPlatform, deviceInfoXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader);
            transmitRequest(parentPlatform, request);
        } catch (SipException | ParseException | InvalidArgumentException e) {
@@ -387,7 +396,7 @@
            CallIdHeader callIdHeader = parentPlatform.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, deviceStatusXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader);
            Request request = headerProviderPlatformProvider.createMessageRequest(parentPlatform, deviceStatusXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader);
            transmitRequest(parentPlatform, request);
        } catch (SipException | ParseException | InvalidArgumentException e) {
@@ -422,11 +431,7 @@
            deviceStatusXml.append("<Altitude>" + gpsMsgInfo.getAltitude() + "</Altitude>\r\n");
            deviceStatusXml.append("</Notify>\r\n");
            CallIdHeader callIdHeader = parentPlatform.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            callIdHeader.setCallId(subscribeInfo.getCallId());
            sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
           sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
                logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
            }, null);
@@ -448,8 +453,8 @@
        if (parentPlatform == null) {
            return false;
        }
        logger.info("[发送 报警订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), deviceAlarm.getChannelId(),
                deviceAlarm.getLongitude(), deviceAlarm.getLatitude());
        logger.info("[发送报警通知] {}/{}->{},{}: {}", parentPlatform.getServerGBId(), deviceAlarm.getChannelId(),
                deviceAlarm.getLongitude(), deviceAlarm.getLatitude(), JSONObject.toJSON(deviceAlarm));
        try {
            String characterSet = parentPlatform.getCharacterSet();
            StringBuffer deviceStatusXml = new StringBuffer(600);
@@ -472,7 +477,7 @@
            CallIdHeader callIdHeader = parentPlatform.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, deviceStatusXml.toString(), SipUtils.getNewFromTag(), SipUtils.getNewViaTag(), callIdHeader);
            Request request = headerProviderPlatformProvider.createMessageRequest(parentPlatform, deviceStatusXml.toString(), SipUtils.getNewFromTag(), SipUtils.getNewViaTag(), callIdHeader);
            transmitRequest(parentPlatform, request);
        } catch (SipException | ParseException  e) {
@@ -524,18 +529,15 @@
        return true;
    }
    private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent,
    private ClientTransaction sendNotify(ParentPlatform parentPlatform, String catalogXmlContent,
                                   SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent,  SipSubscribe.Event okEvent )
            throws NoSuchFieldException, IllegalAccessException, SipException, ParseException, InvalidArgumentException {
      MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
        String characterSet = parentPlatform.getCharacterSet();
       // 设置编码, 防止中文乱码
      messageFactory.setDefaultContentEncodingCharset(characterSet);
        Dialog dialog  = subscribeInfo.getDialog();
        if (dialog == null || !dialog.getState().equals(DialogState.CONFIRMED)) {
            return;
        }
        SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY);
        SIPRequest notifyRequest = headerProviderPlatformProvider.createNotifyRequest(parentPlatform, catalogXmlContent, subscribeInfo);
        notifyRequest.getCSeqHeader().setSeqNumber(redisCatchStorage.getCSEQ());
        
@@ -555,26 +557,32 @@
        sipURI.setHost(parentPlatform.getServerIP());
        sipURI.setPort(parentPlatform.getServerPort());
//        ClientTransaction transaction = subscribeInfo.getClientTransaction();
//        if (transaction == null || transaction.getState().equals(TransactionState.COMPLETED)) {
//            if ("TCP".equals(parentPlatform.getTransport())) {
//                transaction = tcpSipProvider.getNewClientTransaction(notifyRequest);
//            } else if ("UDP".equals(parentPlatform.getTransport())) {
//                transaction = udpSipProvider.getNewClientTransaction(notifyRequest);
//            }
//        }
        ClientTransaction transaction = null;
        if ("TCP".equals(parentPlatform.getTransport())) {
            transaction = tcpSipProvider.getNewClientTransaction(notifyRequest);
        } else if ("UDP".equals(parentPlatform.getTransport())) {
            transaction = udpSipProvider.getNewClientTransaction(notifyRequest);
        }
        // 添加错误订阅
        if (errorEvent != null) {
            sipSubscribe.addErrorSubscribe(subscribeInfo.getCallId(), errorEvent);
            sipSubscribe.addErrorSubscribe(subscribeInfo.getRequest().getCallIdHeader().getCallId(), errorEvent);
        }
        // 添加订阅
        if (okEvent != null) {
            sipSubscribe.addOkSubscribe(subscribeInfo.getCallId(), okEvent);
            sipSubscribe.addOkSubscribe(subscribeInfo.getRequest().getCallIdHeader().getCallId(), okEvent);
        }
        if (transaction == null) {
            logger.error("平台{}的Transport错误:{}",parentPlatform.getServerGBId(), parentPlatform.getTransport());
            return;
        }
        dialog.sendRequest(transaction);
        transaction.sendRequest();
        return transaction;
    }
    private  String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
@@ -750,7 +758,7 @@
            // callid
            CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                    : udpSipProvider.getNewCallId();
            Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, recordXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader);
            Request request = headerProviderPlatformProvider.createMessageRequest(parentPlatform, recordXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader);
            transmitRequest(parentPlatform, request);
        } catch (SipException | ParseException | InvalidArgumentException e) {
@@ -769,36 +777,8 @@
            return false;
        }
        byte[] dialogByteArray = sendRtpItem.getDialog();
        if (dialogByteArray == null) {
            return false;
        }
        try{
            SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray);
            SipStack sipStack;
            if ("TCP".equals(platform.getTransport())) {
                sipStack = tcpSipProvider.getSipStack();
            } else {
                sipStack = udpSipProvider.getSipStack();
            }
            SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
            if (dialog != sipDialog) {
                dialog = sipDialog;
            }
            if ("TCP".equals(platform.getTransport())) {
                dialog.setSipProvider(tcpSipProvider);
            } else {
                dialog.setSipProvider(udpSipProvider);
            }
            Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
            sipStackField.setAccessible(true);
            sipStackField.set(dialog, sipStack);
            Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners");
            eventListenersField.setAccessible(true);
            eventListenersField.set(dialog, new HashSet<>());
            SIPRequest messageRequest = (SIPRequest)dialog.createRequest(Request.MESSAGE);
            String characterSet = platform.getCharacterSet();
            StringBuffer mediaStatusXml = new StringBuffer(200);
            mediaStatusXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n");
@@ -808,6 +788,10 @@
            mediaStatusXml.append("<DeviceID>" + sendRtpItem.getChannelId() + "</DeviceID>\r\n");
            mediaStatusXml.append("<NotifyType>121</NotifyType>\r\n");
            mediaStatusXml.append("</Notify>\r\n");
            SIPRequest messageRequest = (SIPRequest)headerProviderPlatformProvider.createMessageRequest(platform, mediaStatusXml.toString(),
                    sendRtpItem);
            ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
            messageRequest.setContent(mediaStatusXml.toString(), contentTypeHeader);
            SipURI sipURI = (SipURI) messageRequest.getRequestURI();
@@ -819,17 +803,15 @@
            }else {
                clientTransaction = udpSipProvider.getNewClientTransaction(messageRequest);
            }
            dialog.sendRequest(clientTransaction);
            clientTransaction.sendRequest();
        } catch (SipException e) {
            e.printStackTrace();
            return false;
        } catch (ParseException e) {
            e.printStackTrace();
            return false;
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
            throw new RuntimeException(e);
        }
        return true;
@@ -843,61 +825,46 @@
        }
        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId);
        if (sendRtpItem != null) {
            String mediaServerId = sendRtpItem.getMediaServerId();
            MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
            if (mediaServerItem != null) {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
                zlmrtpServerFactory.closeRTPServer(mediaServerItem, sendRtpItem.getStreamId());
            streamByeCmd(platform, sendRtpItem);
        }
    }
    @Override
    public void streamByeCmd(ParentPlatform platform, SendRtpItem sendRtpItem) {
        if (sendRtpItem == null ) {
            logger.info("[向上级发送BYE], sendRtpItem 为NULL");
            return;
        }
        if (platform == null) {
            logger.info("[向上级发送BYE], platform 为NULL");
            return;
        }
        logger.info("[向上级发送BYE], {}/{}", platform.getServerGBId(), sendRtpItem.getChannelId());
        String mediaServerId = sendRtpItem.getMediaServerId();
        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
        if (mediaServerItem != null) {
            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
            zlmrtpServerFactory.closeRTPServer(mediaServerItem, sendRtpItem.getStreamId());
        }
        try {
            SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem);
            if (byeRequest == null) {
                logger.warn("[向上级发送bye]:无法创建 byeRequest");
            }
            byte[] dialogByteArray = sendRtpItem.getDialog();
            if (dialogByteArray != null) {
                SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray);
                SipStack sipStack;
                if ("TCP".equals(platform.getTransport())) {
                    sipStack = tcpSipProvider.getSipStack();
                } else {
                    sipStack = udpSipProvider.getSipStack();
                }
                SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
                if (dialog != sipDialog) {
                    dialog = sipDialog;
                }
                try {
                    if ("TCP".equals(platform.getTransport())) {
                        dialog.setSipProvider(tcpSipProvider);
                    } else {
                        dialog.setSipProvider(udpSipProvider);
                    }
                    Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
                    sipStackField.setAccessible(true);
                    sipStackField.set(dialog, sipStack);
                    Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners");
                    eventListenersField.setAccessible(true);
                    eventListenersField.set(dialog, new HashSet<>());
                    Request byeRequest = dialog.createRequest(Request.BYE);
                    SipURI byeURI = (SipURI) byeRequest.getRequestURI();
                    byeURI.setHost(platform.getServerIP());
                    byeURI.setPort(platform.getServerPort());
                    ClientTransaction clientTransaction;
                    if ("TCP".equals(platform.getTransport())) {
                        clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
                    } else {
                        clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
                    }
                    dialog.sendRequest(clientTransaction);
                } catch (SipException e) {
                    e.printStackTrace();
                } catch (ParseException e) {
                    e.printStackTrace();
                } catch (NoSuchFieldException e) {
                    e.printStackTrace();
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            ClientTransaction clientTransaction;
            if ("TCP".equals(platform.getTransport())) {
                clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
            } else {
                clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
            }
            clientTransaction.sendRequest();
        } catch (SipException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
            throw new RuntimeException(e);
        }
    }
}