Как проверить, готов ли Kafka Consumer

У меня есть политика фиксации Kafka, установленная на последние и пропущенные первые несколько сообщений. Если перед отправкой сообщений в тему ввода я поспал 20 секунд, все работает как надо. Я не уверен, что проблема в том, что потребителю требуется много времени для перебалансировки раздела. Есть ли способ узнать, готов ли потребитель, прежде чем начать опрос?

7 ответов

  • Ты можешь использовать consumer.assignment(), он вернет набор разделов и проверит, назначены ли все разделы, доступные для этой темы.

  • Если вы используете проект spring-kafka, вы можете включить зависимость spring-kafka-test и использовать метод ниже, чтобы дождаться назначения темы, но у вас должен быть контейнер.ContainerTestUtils.waitForAssignment(Object container, int partitions);

Вы можете сделать следующее:

У меня есть тест, который читает данные из темы Кафка.
Таким образом, вы не можете использовать KafkaConsumer в многопоточной среде, но вы можете передать параметр "Назначение AtomicReference", обновить его в потоке потребителей и прочитать его в другом потоке.

Например, отрывок рабочего кода в проекте для тестирования:

    private void readAvro(String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      String bootstrapServers,
                      int readTimeout) {
    // print the topic name
    AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
    new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();

    long startTime = System.currentTimeMillis();
    long maxWaitingTime = 30_000;
    for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
        Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
        System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
                + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
        if (assignments.size() > 0) {
            break;
        }
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            needStop.set(true);
            break;
        }
    }
    System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}

private void readAvro(String bootstrapServers,
                      String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      int readTimeout,
                      AtomicReference<Set<TopicPartition>> assignment) {

    KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
    System.out.println("Subscribed to topic: " + readFromKafka);
    consumer.subscribe(Collections.singletonList(readFromKafka));

    long started = System.currentTimeMillis();
    while (!needStop.get()) {
        assignment.set(consumer.assignment());
        ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
        events.addAll(CommonUtils4Tst.readEvents(records));

        if (readTimeout == -1) {
            if (events.size() > 0) {
                break;
            }
        } else if (System.currentTimeMillis() - started > readTimeout) {
            break;
        }
    }

    needStop.set(true);

    synchronized (MainTest.class) {
        MainTest.class.notifyAll();
    }
    consumer.close();
}

PS
needStop - глобальный флаг, чтобы остановить все запущенные потоки, если таковые имеются в случае неудачи успеха
events - список объектов, которые я хочу проверить
readTimeout - сколько времени мы будем ждать, пока прочитаем все данные, если readTimeout == -1, то остановимся, когда мы что-нибудь прочитаем

Благодаря Алексею (я тоже проголосовал), я, кажется, решил свою проблему, по сути, следуя той же идее.

Просто хочу поделиться своим опытом... в нашем случае мы используем Kafka в форме запросов и ответов, что-то вроде RPC. Запрос отправляется по одной теме, а затем ожидает ответа по другой теме. Встречается с подобной проблемой, т.е. пропускает первый ответ.

я пытался ... KafkaConsumer.assignment(); неоднократно (с Thread.sleep(100);) но, похоже, не помогает. Добавление KafkaConsumer.poll(50); Кажется, что он заполнил потребителя (группу) и тоже получил первый ответ. Протестировано несколько раз, и теперь оно стабильно работает.

Кстати, тестирование требует остановки приложения и удаления тем Kafka, а также, по сути, перезапуска Kafka.

PS: просто звоню poll(50); без assignment(); Логика извлечения, как упоминал Алексей, не может гарантировать, что потребитель (группа) готов.

Вы можете изменить AlwaysSeekToEndListener (слушает только новые сообщения), чтобы включить обратный вызов:

      public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
    private final Consumer<K, V> consumer;
    private Runnable callback;

    public AlwaysSeekToEndListener(Consumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public AlwaysSeekToEndListener(Consumer<K, V> consumer, Runnable callback) {
        this.consumer = consumer;
        this.callback = callback;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
        if (callback != null) {
            callback.run();
        }
    }
}

и подпишитесь с помощью обратного вызова защелки:

      CountDownLatch initLatch = new CountDownLatch(1);

consumer.subscribe(singletonList(topic), new AlwaysSeekToEndListener<>(consumer, () -> initLatch.countDown()));

initLatch.await(); // blocks until consumer is ready and listening

затем приступайте к запуску вашего продюсера.

Мне нужно было знать, готов ли потребитель kafka, прежде чем проводить какое-либо тестирование, поэтому я попытался использовать Consumer.assignment() , но он вернул только набор назначенных разделов, но возникла проблема, с этим я не могу увидеть, назначены ли эти разделы для группы было установлено смещение, поэтому позже, когда я попытался использовать потребителя, смещение не было установлено правильно.

Решения заключались в использовании commit() , это даст вам последние зафиксированные смещения заданных разделов, которые вы указали в аргументах.

Итак, вы можете сделать что-то вроде: consumer.committed(consumer.assignment())

Если еще нет назначенных разделов, он вернет:

      {}

Если есть назначенные разделы, но еще нет смещения:

      {name.of.topic-0=null, name.of.topic-1=null}

Но если есть разделы и смещение:

      {name.of.topic-0=OffsetAndMetadata{offset=5197881, leaderEpoch=null, metadata=''}, name.of.topic-1=OffsetAndMetadata{offset=5198832, leaderEpoch=null, metadata=''}}

С этой информацией вы можете использовать что-то вроде:

      consumer.committed(consumer.assignment()).isEmpty();
consumer.committed(consumer.assignment()).containsValue(null);

И с этой информацией вы можете быть уверены, что потребитель кафки готов.

Если ваша политика установлена ​​на последнее - что вступает в силу, если нет ранее зафиксированных смещений - но у вас нет ранее зафиксированных смещений, то вам не следует беспокоиться об "пропущенных" сообщениях, потому что вы говорите Kafka не заботиться о сообщениях, которые были отправлены "ранее" вашим потребителям в готовом виде.

Если вы заботитесь о "предыдущих" сообщениях, вы должны установить политику как можно раньше.

В любом случае, какой бы ни была политика, поведение, которое вы видите, является временным, т.е. как только зафиксированные смещения сохраняются в Kafka, при каждом перезапуске потребители выбирают то, что они оставили ранее.

Я столкнулся с подобной проблемой во время тестирования EmbeddedKafka.

Отказ от ответственности. Мой подход, возможно, не похож на «путь Кафки», но он позволяет выполнять работу с учетом некоторых компромиссов. И, конечно, его нельзя использовать нигде, кроме тестов.

В целом тест состоит из следующих шагов:

  1. Создать потребителя
  2. Напишите сообщение в тему
  3. Ожидайте, что было использовано единственное конкретное сообщение

Поэтому я ищуauto.offset.reset=latestсемантический с гарантиями того, что назначенная тема готова к опросу. В конце я решил использовать специальное сообщение, чтобы отметить, что потребитель готов:

      public class ConsumerHelper {
    
    public static KafkaConsumer<String, Object> buildConsumer(EmbeddedKafkaBroker broker, Set<String> topics) {
        var consumer = buildConsumer(broker);
        if (!CollectionUtils.isEmpty(topics)) {
            var producer = buildUtilProducer(...);
            var key = "util-message-key" + UUID.randomUUID(); //key must be unique for every method call
            topics.forEach(
                    topic -> producer.send(new ProducerRecord<>(topic, key, new Object()))
            );
            var uncheckedTopics = new HashSet<>(topics);
            consumer.subscribe(topics);
            do {
                consumer.poll(Duration.ofMillis()).forEach(record -> {
                    if (key.equals(record.getKey())) {
                        uncheckedTopics.remove(record.topic())
                    }
                });
                consumer.commitSync()
            } while (!uncheckedTopics.isEmpty() /* you may add some timeout check logic here if needed */)
        }
        return consumer;

    }


    /**
     * consumer builder method, e.g. with KafkaTestUtils
     *
     * @implSpec consumer group id must be unique, {@code auto.offset.reset} must be setted to {@code earliest}
     */
    private static KafkaConsumer<String, Object> buildConsumer(EmbeddedKafkaBroker broker) {
        var randomGroupId = "group-id-" + UUID.randomUUID(); //consumer group id must be unique
        var consumerProps = KafkaTestUtils.consumerProps(randomGroupId, "true", broker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //this is important
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserilizer.class);
        //some extra consumer props if needed
        //...
        //
        return new KafkaConsumer<>(consumerProps);

    }

    /**
     * util producer builder method, e.g. with KafkaTestUtils
     */
    private static KafkaConsumer<String, Object> buildUtilProducer() {
        //...
    }

}

В конце концов, KafkaConsumer, созданный с помощью общедоступного метода, готов немедленно принимать новые сообщения.

Очевидное ограничение: тесты не должны запускаться одновременно.

Другие вопросы по тегам