zxl
2026-03-25 0b39edb68acc67ed01fbfe5d31bfa776a1b17de1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.tievd.jyz.config.mqtt;
 
import com.tievd.jyz.mqtt.MqttMsgReceiver;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
 
import java.util.Objects;
 
/**
 *mqtt配置
 * @author timi
 */
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {
    private static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";
 
    private static final String INPUT_CHANNEL = "mqttInputChannel";
    /**
     * 接收指令消息
     */
    private static final String COMMAND_TOPIC = "/ty/center/command/";
    /**
     * 接收事件消息
     */
    private static final String EVENT_TOPIC = "/ty/center/event/";
 
    @Value("${mqtt.username}")
    private String username;
 
    @Value("${mqtt.password}")
    private String password;
 
    @Value("${mqtt.serverURIs}")
    private String hostUrl;
 
    @Value("${mqtt.client.id}")
    private String clientId;
 
    @Value("${mqtt.topic}")
    private String defaultTopic;
 
    @Value("${mqtt.completionTimeout}")
    private String completionTimeout;
 
    @Value("${mqtt.reciveTopiceName}")
    private String reciveTopiceName;
 
    @Autowired
    private MqttMsgReceiver mqttMsgReceiver;
 
    @Bean
    public MqttPahoClientFactory clientFactory() {
        final MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{hostUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setAutomaticReconnect(true);
        //不清理队列,保证AR发送的数据都不会丢弃
        options.setCleanSession(false);
        options.setMaxInflight(2000);
        final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(options);
        return factory;
    }
 
    @Bean(value = OUTBOUND_CHANNEL)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
 
    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler mqttOutbound() {
        final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
        handler.setDefaultQos(1);
        handler.setDefaultRetained(false);
        handler.setDefaultTopic(defaultTopic);
        handler.setAsync(false);
        handler.setAsyncEvents(false);
        return handler;
    }
 
    /**
     * MQTT消息接收处理
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    /**
     * 配置client,监听的topic
     * @return
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        clientId + "_inbound", clientFactory(), reciveTopiceName.split(","));
        adapter.setCompletionTimeout(Long.valueOf(completionTimeout));
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
 
    /**
     * 通过通道获取数据
     */
    @Bean
    @ServiceActivator(inputChannel = INPUT_CHANNEL)
    public MessageHandler handler() {
        return message -> {
            String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
            log.info("topic: {}", topic);
            String msg = message.getPayload().toString();
            log.debug("payload: {}", msg);
            if(topic.contains(COMMAND_TOPIC)){
                mqttMsgReceiver.dealCommandMsg(msg);
            }else if(topic.contains(EVENT_TOPIC)){
                mqttMsgReceiver.dealEventMsg(msg);
            }
        };
    }
}