Встроенные тесты Кафки (запускаемые SBT) периодически терпят неудачу с ошибками ZooKeeperServer

Я пишу группу тестов с использованием весеннего утилиты тестирования KafkaEmbedded. Каждый из тестов по отдельности поддерживает встроенный экземпляр kafka, генерирует события и утверждает результирующие последующие события.

Тесты последовательно проходят при запуске в IDE (например, IntelliJ), однако тесты периодически сбои (примерно 50% времени, без регулярности) при запуске с SBT. В случае сбоя теста я вижу следующую ошибку:

o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes

Кроме того, я вижу много журналов INFO, которые сообщают об отсутствующих узлах ZooKeeper, например:

o.a.z.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x164a50fe9fd0001 type:create cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers

Эти журналы НЕ появляются при успешных тестах. Когда я говорю "Я вижу много журналов INFO", я имею в виду МНОГО, около 40 таких журналов, причем некоторые пути узлов вложены в ранее сообщенные пути узлов.

Исследования показывают, что журнал ошибок невиновен, и я хотел бы думать, что журналы информации тоже, но они одиноки при неудачном тестировании.

Обновление 7/17:

KStreams / Производители / Потребители Конфигурации:

Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

Встроенный запуск Kafka:

@Rule
public KafkaEmbedded kafka = new KafkaEmbedded(getNumKafkaServers(), true, getNumPartitionsPerTopic(), getTopics().keySet().toArray(new String[0]));

Тест перед:

@Before
public void before() {
    // build KStreams and start topology
    KStreamBuilder kStreamBuilder = new KStreamBuilder();
    buildStream(kStreamBuilder);
    KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
    kafkaStreams.start();
}

Тест после:

@After
public void after() {
    kafka.destroy();
    FileUtils.deleteDirectory(new File(streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG)));
}

Обновление 7/17:

Более подробно, это проект Spring, и каждый тест помечается так:

@SpringBootTest(classes = <this-test-class>.class)
@ActiveProfiles("test")
@RunWith(SpringRunner.class)

0 ответов

Другие вопросы по тегам