package com.tievd.jyz.config.mqtt; import com.tievd.jyz.mqtt.MqttMsgReceiver; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.Objects; /** *mqtt配置 * @author timi */ @Slf4j @Configuration @IntegrationComponentScan public class MqttConfig { private static final String OUTBOUND_CHANNEL = "mqttOutboundChannel"; private static final String INPUT_CHANNEL = "mqttInputChannel"; /** * 接收指令消息 */ private static final String COMMAND_TOPIC = "/ty/center/command/"; /** * 接收事件消息 */ private static final String EVENT_TOPIC = "/ty/center/event/"; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.serverURIs}") private String hostUrl; @Value("${mqtt.client.id}") private String clientId; @Value("${mqtt.topic}") private String defaultTopic; @Value("${mqtt.completionTimeout}") private String completionTimeout; @Value("${mqtt.reciveTopiceName}") private String reciveTopiceName; @Autowired private MqttMsgReceiver mqttMsgReceiver; @Bean public MqttPahoClientFactory clientFactory() { final MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{hostUrl}); options.setUserName(username); options.setPassword(password.toCharArray()); options.setAutomaticReconnect(true); //不清理队列,保证AR发送的数据都不会丢弃 options.setCleanSession(false); options.setMaxInflight(2000); final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(options); return factory; } @Bean(value = OUTBOUND_CHANNEL) public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = OUTBOUND_CHANNEL) public MessageHandler mqttOutbound() { final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory()); handler.setDefaultQos(1); handler.setDefaultRetained(false); handler.setDefaultTopic(defaultTopic); handler.setAsync(false); handler.setAsyncEvents(false); return handler; } /** * MQTT消息接收处理 */ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置client,监听的topic * @return */ @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clientId + "_inbound", clientFactory(), reciveTopiceName.split(",")); adapter.setCompletionTimeout(Long.valueOf(completionTimeout)); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 通过通道获取数据 */ @Bean @ServiceActivator(inputChannel = INPUT_CHANNEL) public MessageHandler handler() { return message -> { String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString(); log.info("topic: {}", topic); String msg = message.getPayload().toString(); log.debug("payload: {}", msg); if(topic.contains(COMMAND_TOPIC)){ mqttMsgReceiver.dealCommandMsg(msg); }else if(topic.contains(EVENT_TOPIC)){ mqttMsgReceiver.dealEventMsg(msg); } }; } }