Потребитель Kafka не потребляет, если первый брокер не работает

Я использую последнюю версию kafka(kafka_2.12-1.0.0.tgz). Я установил простой кластер с 3 брокерами (только что изменили broker.id=1 и listeners=PLAINTEXT://:9092 в файле свойств для каждого экземпляра). После того, как кластер запущен, я создал тему с помощью следующей команды

./kafka-topics.sh --create    --zookeeper localhost:2181  --replication-factor 3     --partitions 13    --topic demo

затем запустите kafka потребителей и производителей с помощью следующих команд

./kafka-console-producer.sh --topic  demo  --broker-list localhost:9094,localhost:9093,localhost:9092

./kafka-console-consumer.sh --group test --bootstrap-server localhost:9094,localhost:9093,localhost:9092  --topic demo

Все в порядке, когда все брокеры. Но если я сначала убью (по порядку запуска) сообщения брокера, они отправляются брокерам, но потребитель не может получить никакого сообщения. Сообщения не теряются. После запуска этого брокера потребитель сразу получает сообщение.

Журналы потребителя после закрытия экземпляра брокера:

[2018-01-09 13: 39: 31,130] WARN [Consumer clientId=consumer-1, groupId=test] Не удалось установить соединение с узлом 2147483646. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,132] WARN [Consumer clientId=consumer-1, groupId=test] Соединение с узлом 1 не может быть установлено. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,344] ПРЕДУПРЕЖДЕНИЕ [Consumer clientId=consumer-1, groupId=test] Не удалось установить соединение с узлом 2147483646. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,451] WARN [Consumer clientId=consumer-1, groupId=test] Соединение с узлом 1 не может быть установлено. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,848] WARN [Consumer clientId=consumer-1, groupId=test] Не удалось установить соединение с узлом 2147483646. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,950] WARN [Consumer clientId=consumer-1, groupId=test] Не удалось установить соединение с узлом 1. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:32,363] WARN [Consumer clientId=consumer-1, groupId=test] Не удалось установить соединение с узлом 2147483646. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:33,092] WARN [Consumer clientId=consumer-1, groupId=test] Не удалось установить соединение с узлом 2147483646. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,216] ПРЕДУПРЕЖДЕНИЕ [Consumer clientId=consumer-1, groupId=test] Не удалось установить соединение с узлом 2147483646. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,218] WARN [Потребительский clientId = потребитель-1, groupId = тест] Асинхронная автоматическая фиксация смещений {demo-0 = OffsetAndMetadata {смещение = 3, метаданные =''}, demo-1 = OffsetAndMetadata {offset = 3, metadata =''}, demo-2 = OffsetAndMetadata {offset = 2, metadata =''}, demo-3 = OffsetAndMetadata {offset = 2, metadata =''}, demo-4 = OffsetAndMetadata {offset = 1, metadata =''}, demo-5 = OffsetAndMetadata {offset = 1, metadata =''}, demo-6 = OffsetAndMetadata {offset = 3, метаданные =''}, demo-7=OffsetAndMetadata{offset = 2, metadata =''}, demo-8=OffsetAndMetadata{offset = 3, metadata =''}, demo-9=OffsetAndMetadata{offset =2, metadata=''}, demo-10=OffsetAndMetadata{offset=3, metadata=''}, demo-11=OffsetAndMetadata{offset=2, metadata=''}, demo-12=OffsetAndMetadata{offset=2, metadata =''}} failed: смещение фиксации не удалось с повторяющимся исключением. Вы должны повторить попытку смещения. Основной ошибкой было: координатор недоступен. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13: 39: 34,219] WARN [Consumer clientId=consumer-1, groupId=test] Не удалось установить соединение с узлом 1. Брокер может быть недоступен. (Org.apache.kafka.clients.NetworkClient)

Журнал потребителя после повторного запуска пропавшего брокера:

[2018-01-09 13: 41: 21,739] ОШИБКА [Consumer clientId=consumer-1, groupId=test] Сбой при фиксации смещения на демо-0 раздела со смещением 3: это неверный координатор. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13: 41: 21,739] WARN [Consumer clientId=consumer-1, groupId=test] Асинхронная автоматическая фиксация смещений {demo-0=OffsetAndMetadata{offset=3, metadata=''}, demo-1=OffsetAndMetadata{offset=3, metadata=''}, demo-2=OffsetAndMetadata{offset=2, metadata=''}, demo-3=OffsetAndMetadata{offset=2, metadata=''}, demo-4=OffsetAndMetadata{offset=1, metadata=''}, demo-5=OffsetAndMetadata{offset=1, metadata=''}, demo-6=OffsetAndMetadata{offset=3, метаданные =''}, демо-7=OffsetAndMetadata{смещение = 2, метаданные =''}, демо-8=OffsetAndMetadata{смещение = 3, метаданные =''}, демо-9=OffsetAndMetadata{смещение = 2, metadata =''}, demo-10 = OffsetAndMetadata {offset = 3, metadata =''}, demo-11 = OffsetAndMetadata {offset = 2, metadata =''}, demo-12 = OffsetAndMetadata {offset = 2, метаданные = ''}} не удалось: сбой фиксации с повторяющимся исключением. Вы должны повторить попытку смещения. Основная ошибка была: это не правильный координатор. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,353] ОШИБКА [Consumer clientId=consumer-1, groupId=test] Сбой при фиксации смещения на демо-0 раздела со смещением 3: это не правильный координатор. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,354] WARN [Consumer clientId=consumer-1, groupId=test] Асинхронная автоматическая фиксация смещений {demo-0=OffsetAndMetadata{offset=3, metadata=''}, demo-1=OffsetAndMetadata{offset=3, metadata=''}, demo-2=OffsetAndMetadata{offset=2, metadata=''}, demo-3=OffsetAndMetadata{offset=2, metadata=''}, demo-4=OffsetAndMetadata{offset=1, metadata=''}, demo-5=OffsetAndMetadata{offset=1, metadata=''}, demo-6=OffsetAndMetadata{offset=3, метаданные =''}, демо-7=OffsetAndMetadata{смещение = 2, метаданные =''}, демо-8=OffsetAndMetadata{смещение = 3, метаданные =''}, демо-9=OffsetAndMetadata{смещение =2, metadata=''}, demo-10=OffsetAndMetadata{offset=3, metadata=''}, demo-11=OffsetAndMetadata{offset=3, metadata=''}, demo-12=OffsetAndMetadata{offset=2, метаданные = ''}} не удалось: сбой фиксации с повторяющимся исключением. Вы должны повторить попытку смещения. Основная ошибка была: это не правильный координатор. (Org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Спасибо

1 ответ

Решение

Попробуйте проверить "offsets.topic.replication.factor" в файле server-*. Properties

Например:

############################# Internal Topic Settings       
# The replication factor for the group metadata internal topics    
# For anything other than development testing, a value greater than 1 is  recommended for to ensure availability such as 3.
offsets.topic.replication.factor=3

http://kafka.apache.org/documentation/

С помощью KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR на yml файл решить эту проблему.

Например, используя 2 рабочих на docker-swarm,

environment:
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
Другие вопросы по тегам