Spring Integration: Основы тестирования интеграции MQTT
Я пытаюсь протестировать простой слушатель MQTT, созданный с помощью Spring Boot и Spring Integration, но у меня не получается. Я перепробовал много подходов. Наиболее перспективным было:
@RunWith(SpringRunner.class)
@SpringBootTest
public class BasicMqttTest {
@Value("${mqtt.client.id}")
private String mqttClientId;
@Value("${mqtt.state.topic}")
private String mqttTopic;
@Autowired
MqttPahoClientFactory mqttPahoClientFactory;
protected IMqttClient client;
@Before
public void setUp() throws Exception {
client = mqttPahoClientFactory.getClientInstance(mqttPahoClientFactory.getConnectionOptions().getServerURIs()[0], mqttClientId);
client.connect();
}
@After
public void tearDown() throws Exception {
client.disconnect();
client.close();
}
@Test
public void contextLoads() throws Exception {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload("MQTT!".getBytes());
client.publish(mqttTopic, mqttMessage);
}
}
Тем не менее, тест выполняется с 2018-07-12 16:53:50.937 ERROR 21160 --- [T Rec: consumer] .m.i.MqttPahoMessageDrivenChannelAdapter : Lost connection: Verbindung wurde getrennt; retrying...
и я не вижу ничего распечатанного. Код в основном из: https://github.com/spring-projects/spring-integration-samples/tree/master/basic/mqtt. Пример работает хорошо, но мне нужно уметь писать правильные интеграционные тесты.
Конфигурация:
@Value("${mqtt.server.uri}")
private String mqttServerUri;
@Value("${mqtt.username}")
private String mqttUsername;
@Value("${mqtt.password}")
private String mqttPassword;
@Value("${mqtt.client.id}")
private String mqttClientId;
@Value("${mqtt.state.topic}")
private String mqttTopic;
@Value("${mqtt.completion.timeout}")
private Integer mqttCompletionTimeout;
@Value("${mqtt.quality.of.service}")
private Integer mqttQualityOfService;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{mqttServerUri});
options.setUserName(mqttUsername);
options.setPassword(mqttPassword.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@Qualifier("MqttInboundChannel")
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttClientId,
mqttClientFactory(),
mqttTopic
);
adapter.setCompletionTimeout(mqttCompletionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(mqttQualityOfService);
return adapter;
}
@Bean
public IntegrationFlow myMqttInFlow() {
return IntegrationFlows.from(mqttInbound)
.handle(message -> {
System.out.println(message);
}).get();
}
ОБНОВИТЬ:
Также не работал:
@Autowired
protected MessageHandler mqttOutbound;
@Test
public void test0() throws Exception {
mqttOutbound.handleMessage(MessageBuilder.withPayload("test").build());
}
@SpringBootApplication
static class MqttSourceApplication {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("test", mqttClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("test");
messageHandler.setConverter(pahoMessageConverter());
return messageHandler;
}
@Bean
public DefaultPahoMessageConverter pahoMessageConverter() {
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(1, false, "UTF-8");
return converter;
}
}
ОБНОВИТЬ
Еще проще... та же ошибка:
@Autowired
MqttPahoClientFactory mqttPahoClientFactory;
private MessageHandler mqttOutbound;
@Before
public void setUp() throws Exception {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttClientId, mqttPahoClientFactory);
messageHandler.setAsync(false);
messageHandler.setDefaultTopic(mqttTopic);
messageHandler.setConverter(new DefaultPahoMessageConverter());
mqttOutbound = messageHandler;
}
@Test
public void test0() throws Exception {
mqttOutbound.handleMessage(MessageBuilder.withPayload("test").build());
Thread.sleep(10000L);
}
1 ответ
Хорошо, решено: Paho, очевидно, закрыл мое соединение, так как и test, и main использовали один и тот же идентификатор клиента. Решением было заменить clientId на MqttAsyncClient.generateClientId()
как предложено здесь: /questions/38911654/spring-mqttpahomessagedrivenchanneladapter-poteryannoe-soedinenie-soedinenie-poteryano-povtornaya-popyitka/38911668#38911668