package com.tievd.jyz.config.mqtt;
|
|
import com.tievd.jyz.mqtt.MqttMsgReceiver;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.core.MessageProducer;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
|
import java.util.Objects;
|
|
/**
|
*mqtt配置
|
* @author timi
|
*/
|
@Slf4j
|
@Configuration
|
@IntegrationComponentScan
|
public class MqttConfig {
|
private static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";
|
|
private static final String INPUT_CHANNEL = "mqttInputChannel";
|
/**
|
* 接收指令消息
|
*/
|
private static final String COMMAND_TOPIC = "/ty/center/command/";
|
/**
|
* 接收事件消息
|
*/
|
private static final String EVENT_TOPIC = "/ty/center/event/";
|
|
@Value("${mqtt.username}")
|
private String username;
|
|
@Value("${mqtt.password}")
|
private String password;
|
|
@Value("${mqtt.serverURIs}")
|
private String hostUrl;
|
|
@Value("${mqtt.client.id}")
|
private String clientId;
|
|
@Value("${mqtt.topic}")
|
private String defaultTopic;
|
|
@Value("${mqtt.completionTimeout}")
|
private String completionTimeout;
|
|
@Value("${mqtt.reciveTopiceName}")
|
private String reciveTopiceName;
|
|
@Autowired
|
private MqttMsgReceiver mqttMsgReceiver;
|
|
@Bean
|
public MqttPahoClientFactory clientFactory() {
|
final MqttConnectOptions options = new MqttConnectOptions();
|
options.setServerURIs(new String[]{hostUrl});
|
options.setUserName(username);
|
options.setPassword(password.toCharArray());
|
options.setAutomaticReconnect(true);
|
//不清理队列,保证AR发送的数据都不会丢弃
|
options.setCleanSession(false);
|
options.setMaxInflight(2000);
|
final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
factory.setConnectionOptions(options);
|
return factory;
|
}
|
|
@Bean(value = OUTBOUND_CHANNEL)
|
public MessageChannel mqttOutboundChannel() {
|
return new DirectChannel();
|
}
|
|
@Bean
|
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
|
public MessageHandler mqttOutbound() {
|
final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
|
handler.setDefaultQos(1);
|
handler.setDefaultRetained(false);
|
handler.setDefaultTopic(defaultTopic);
|
handler.setAsync(false);
|
handler.setAsyncEvents(false);
|
return handler;
|
}
|
|
/**
|
* MQTT消息接收处理
|
*/
|
@Bean
|
public MessageChannel mqttInputChannel() {
|
return new DirectChannel();
|
}
|
|
/**
|
* 配置client,监听的topic
|
* @return
|
*/
|
@Bean
|
public MessageProducer inbound() {
|
MqttPahoMessageDrivenChannelAdapter adapter =
|
new MqttPahoMessageDrivenChannelAdapter(
|
clientId + "_inbound", clientFactory(), reciveTopiceName.split(","));
|
adapter.setCompletionTimeout(Long.valueOf(completionTimeout));
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
adapter.setQos(1);
|
adapter.setOutputChannel(mqttInputChannel());
|
return adapter;
|
}
|
|
/**
|
* 通过通道获取数据
|
*/
|
@Bean
|
@ServiceActivator(inputChannel = INPUT_CHANNEL)
|
public MessageHandler handler() {
|
return message -> {
|
String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
|
log.info("topic: {}", topic);
|
String msg = message.getPayload().toString();
|
log.debug("payload: {}", msg);
|
if(topic.contains(COMMAND_TOPIC)){
|
mqttMsgReceiver.dealCommandMsg(msg);
|
}else if(topic.contains(EVENT_TOPIC)){
|
mqttMsgReceiver.dealEventMsg(msg);
|
}
|
};
|
}
|
}
|