648540858
2022-03-03 2eb1ca2d94a09c2d3ced69de28de72d2d6d77d8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.RequestEvent;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import java.util.HashMap;
import java.util.Map;
 
/**
 * SIP命令类型: ACK请求
 */
@Component
public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
 
    private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
    private String method = "ACK";
 
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
 
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
        sipProcessorObserver.addRequestProcessor(method, this);
    }
 
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
 
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
 
    @Autowired
    private IMediaServerService mediaServerService;
 
 
    /**   
     * 处理  ACK请求
     * 
     * @param evt
     */
    @Override
    public void process(RequestEvent evt) {
        Dialog dialog = evt.getDialog();
        if (dialog == null) return;
        if (dialog.getState()== DialogState.CONFIRMED) {
            String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
            String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId);
            String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
            String deviceId = sendRtpItem.getDeviceId();
            StreamInfo streamInfo = null;
            if (deviceId == null) {
                streamInfo = new StreamInfo();
                streamInfo.setApp(sendRtpItem.getApp());
                streamInfo.setStream(sendRtpItem.getStreamId());
            }else {
                streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
                sendRtpItem.setStreamId(streamInfo.getStream());
                streamInfo.setApp("rtp");
            }
 
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
            logger.info(platformGbId);
            logger.info(channelId);
            Map<String, Object> param = new HashMap<>();
            param.put("vhost","__defaultVhost__");
            param.put("app",streamInfo.getApp());
            param.put("stream",streamInfo.getStream());
            param.put("ssrc", sendRtpItem.getSsrc());
            param.put("dst_url",sendRtpItem.getIp());
            param.put("dst_port", sendRtpItem.getPort());
            param.put("is_udp", is_Udp);
            //param.put ("src_port", sendRtpItem.getLocalPort());
            // 设备推流查询,成功后才能转推
            boolean rtpPushed = false;
            long startTime = System.currentTimeMillis();
            while (!rtpPushed) {
                try {
                    if (System.currentTimeMillis() - startTime < 30 * 1000) {
                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                        if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStream())) {
                            rtpPushed = true;
                            logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
                                    streamInfo.getApp() ,streamInfo.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
                            zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                        } else {
                            logger.info("等待设备推流[{}/{}].......",
                                    streamInfo.getApp() ,streamInfo.getStream());
                            Thread.sleep(1000);
                            continue;
                        }
                    } else {
                        rtpPushed = true;
                        logger.info("设备推流[{}/{}]超时,终止向上级推流",
                                streamInfo.getApp() ,streamInfo.getStream());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}