Как правильно использовать @EmbeddedKafka для IntegrationTests?
Я прочитал документацию по spring-kafka, примеры, которые я нашел, и половину стека overflow, но я не понимаю, кто должен работать @EmbeddedKafka. Специально для интеграционных тестов.
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@EnableKafka
@EmbeddedKafka(controlledShutdown = true
// brokerProperties = {"log.dir=${kafka.broker.logs-dir}",
// "listeners=PLAINTEXT://localhost:${kafka.broker.port:3333}",
// "auto.create.topics.enable=${kafka.broker.topics-enable:true}"}
)
и мне пришлось добавить в test.properties следующее:
….kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
со всем этим, когда тест выполняется, он жалуется на несколько строк о недоступности kafka:
[Producer clientId=producer-2] Connection to node 0 could not be established. Broker may not be available.
но после половины экрана оно продолжается, тест завершается успешно и завершается с последним неприятным сообщением:
2019-06-17 12:31:54.438 WARN [testing,,,] 32448 --- [ost-startStop-1] o.a.c.loader.WebappClassLoaderBase : The web application [testing] appears to have started a thread named [kafka-producer-network-thread | producer-3] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
org.apache.kafka.common.network.Selector.select(Selector.java:674)
org.apache.kafka.common.network.Selector.poll(Selector.java:396)
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
java.lang.Thread.run(Thread.java:748)
вопросы: 1/ почему в начале каждого метода появляются сообщения о недоступности брокера, даже если контекст не был грязным и повторно инициализированным? 2/ почему уродливое сообщение с незакрытой веткой? В чем дело?