Встроенные тесты Кафки (запускаемые SBT) периодически терпят неудачу с ошибками ZooKeeperServer
Я пишу группу тестов с использованием весеннего утилиты тестирования KafkaEmbedded. Каждый из тестов по отдельности поддерживает встроенный экземпляр kafka, генерирует события и утверждает результирующие последующие события.
Тесты последовательно проходят при запуске в IDE (например, IntelliJ), однако тесты периодически сбои (примерно 50% времени, без регулярности) при запуске с SBT. В случае сбоя теста я вижу следующую ошибку:
o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes
Кроме того, я вижу много журналов INFO, которые сообщают об отсутствующих узлах ZooKeeper, например:
o.a.z.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x164a50fe9fd0001 type:create cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers
Эти журналы НЕ появляются при успешных тестах. Когда я говорю "Я вижу много журналов INFO", я имею в виду МНОГО, около 40 таких журналов, причем некоторые пути узлов вложены в ранее сообщенные пути узлов.
Исследования показывают, что журнал ошибок невиновен, и я хотел бы думать, что журналы информации тоже, но они одиноки при неудачном тестировании.
Обновление 7/17:
KStreams / Производители / Потребители Конфигурации:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
Встроенный запуск Kafka:
@Rule
public KafkaEmbedded kafka = new KafkaEmbedded(getNumKafkaServers(), true, getNumPartitionsPerTopic(), getTopics().keySet().toArray(new String[0]));
Тест перед:
@Before
public void before() {
// build KStreams and start topology
KStreamBuilder kStreamBuilder = new KStreamBuilder();
buildStream(kStreamBuilder);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
}
Тест после:
@After
public void after() {
kafka.destroy();
FileUtils.deleteDirectory(new File(streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG)));
}
Обновление 7/17:
Более подробно, это проект Spring, и каждый тест помечается так:
@SpringBootTest(classes = <this-test-class>.class)
@ActiveProfiles("test")
@RunWith(SpringRunner.class)