Apache Kafka в Confluent Cloud - несогласованные смещения в разделенной теме и отставание потребителя

Я нахожу странное поведение при использовании Kafka в Confluent Cloud. Я создал тему со значением разделения по умолчанию: 6.

Моя система состоит из приложения Java Producer, которое отправляет сообщение в эту тему, и приложения Kafka Streams, которое читает из него и выполняет операцию для каждого сообщения.

-----------------------          --------            -----------
| Kafka Java Producer |  ---->  | topic | ---->      | KStream |
-----------------------          --------            -----------

На данный момент я запускаю только один экземпляр приложения Kafka Streams, поэтому у группы потребителей есть один участник.

Вот что я заметил:

  1. Производитель отправляет сообщение, и оно записывается в тему события со смещением 0:

  1. Сообщение достигает KStream и обрабатывается правильно, как я вижу в трассировке журнала KStream:

KStream

events.foreach { key, value ->
    logger.info("--------> Processing TimeMetric {}", value)
    //Store in DB

Журнал

[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {"...

  1. В лаге потребителей Confluent Cloud я вижу все группы потребителей и их состояние. Там один для KStream называется events-processor-19549050-d8b0-4b39..., Как указывалось ранее, в этой группе только один член (единственный экземпляр KStream). Однако, если показывает, что эта группа находится за одним сообщением в разделе 2. Кроме того, обратите внимание, что текущее смещение, по-видимому, равно 1, а конечное смещение 2):

  1. Если я отправлю другое сообщение в продюсер, оно снова будет записано в теме, но на этот раз со смещением 2 вместо 1:

  1. Сообщение достигает KStream и снова обрабатывается нормально:

[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {

  1. И, возвращаясь к отставанию потребителей группы потребителей, оно все еще на одно сообщение позади, все еще с некоторыми странными смещениями (текущий 3, конец 4):

Хотя обработка кажется хорошей, показанное выше состояние не имеет особого смысла. Можете ли вы объяснить причины, по которым:

  1. смещения сообщения увеличиваются +2 вместо +1?
  2. кажется, что группа потребителей отстает на 1 сообщение, хотя она правильно обработала сообщения?

1 ответ

Решение

Для первого вопроса есть две возможности (хотя, читая 2-й вопрос, вы используете транзакции):

  • Если вы не используете семантику " ровно один раз", производитель может отправить более одного сообщения, так как в канале связи нет того, что оно было отправлено ранее. Таким образом, стандартная семантика Kafka по умолчанию может увеличить число смещений>+1 из-за этих дублированных сообщений.

  • Если вы используете семантику или транзакции " точно один раз", каждое событие транзакции записывает отметку в тему для целей внутреннего контроля. Эти отметки отвечают за увеличение +2, так как они также хранятся в теме (но избегаются потребителем). Руководство Confluent по транзакциям также содержит некоторую информацию об этом поведении:

    После того, как производитель инициирует принятие (или прерывание), координатор начинает протокол двухфазного принятия.

    На первом этапе координатор обновляет свое внутреннее состояние до "prepare_commit" и обновляет это состояние в журнале транзакций. Как только это будет сделано, транзакция гарантированно будет совершена, несмотря ни на что.

    Затем координатор начинает фазу 2, где записывает маркеры фиксации транзакции в разделы раздела, которые являются частью транзакции.

    Эти маркеры транзакций не предоставляются приложениям, но используются потребителями в режиме read_committed для фильтрации сообщений от прерванных транзакций и не для возврата сообщений, которые являются частью открытых транзакций (т. Е. Тех, которые находятся в журнале, но не имеют маркер транзакции, связанный с ними).

    Как только маркеры записаны, координатор транзакции помечает транзакцию как "завершенную", и производитель может начать следующую транзакцию.

Вообще говоря, вам не нужно заботиться о числе смещений, так как оно не является исчерпывающим руководством. Например, повторные попытки, дубликаты или метки транзакции делают смещение отличным от того, что вы ожидаете от своего производителя, но вам не следует об этом беспокоиться; Ваши потребители будут, и они будут заботиться только о "реальных" сообщениях.

Что касается вопроса 2, это известная проблема: https://issues.apache.org/jira/browse/KAFKA-6607

Цитируя джира:

Когда тема ввода для приложения Kafka Streams написана с использованием транзакции, Kafka Streams не фиксирует "endOffset", а "endOffset - 1", если он достигает конца темы. Причина - маркер фиксации, который является последним "сообщением" в теме; Потоки фиксируют "смещение последнего обработанного сообщения плюс 1" и не учитывают маркеры фиксации.

Это не проблема правильности, но когда вы проверяете отставание потребителя с помощью bin/kafka-consumer.group.sh, то отставание отображается как 1 вместо 0 - что является правильным с точки зрения инструмента группы потребителей.

Надеюсь, это поможет!

Другие вопросы по тегам