Кластер Kafka теряет сообщения после перезапуска zookeeper

Я запускаю кластер брокеров кафки, использующих Docker (например, 5 брокеров, по одному брокеру на контейнер). Кафка версия 2.12-0.11.0.0, Zookeeper 3.4.10.

Сценарий:

  • Запуск первого брокера с конфигом ниже

zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888

server.properties

broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker1_IP:broker1_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0

producer.properties

bootstrap.servers=localhost:9092
compression.type=none

consumer.properties

zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
  • Zookeeper запускается в автономном режиме, затем запускается кафка

  • Создание темы

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-test-topic1

  • Отправка сообщения

echo "test_kafka1" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-test-topic1

  • Проверка сообщения

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-test-topic1 --max-messages 1

Сообщение получено

  • Опишите тему

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1

  • Стартовый отдых 4 брокера

zoo.cfg у каждого брокера с 1-го по 5-е (отличается только позиция 0.0.0.0:2888:3888)

tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888
server.2=broker2_IP:broker2_2888:broker2_3888
server.3=broker3_IP:broker3_2888:broker3_3888
server.4=broker4_IP:broker4_2888:broker4_3888
server.5=broker5_IP:broker5_2888:broker5_3888

server.properties для каждого брокера с 1-го по 5-й (broker.id уникальны, broker_IP:broker_PORT отличается для каждого брокера)

broker.id=N
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker_IP:broker_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0

производитель.свойства на каждого брокера с 1 по 5

bootstrap.servers=localhost:9092
compression.type=none

Потребительские свойства на каждого брокера с 1 по 5

zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
  • Перезапуск zookeeper на каждом брокере для вступления в силу zoo.cfg

  • Zookeepers собираются в кластер

  • Тема перенесена в брокер 5

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 5 Replicas: 5 Isr: 5

Это нормальное поведение? Или это должно остаться на брокере 1?

  • Проверка сообщения на каждого брокера

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-test-topic1 --max-messages 1

Сообщение потеряно (сообщение не теряется, когда тема остается в брокере 1, поэтому это плавающая ситуация)

1 ответ

Вы пытались увеличить время до 6000? Исходя из настроек Hadoop, они используют это по умолчанию, заявив, что значение 2000 миллисекунд слишком мало. Я думаю, что то же самое применимо и здесь. Я сейчас работаю над очень похожей проблемой кафки.

В документации Kafka и описание конфигурации, и пример конфигурации рекомендуют указывать все серверы zookeeper в брокере. zookeeper.connect, Также в производстве ожидается, что вы запустите отдельный кластер Zookeeper и отдельный кластер Kafka, а не совместный запуск Kafka и ZK в одном док-контейнере.

Я полагаю, что-то подобное может происходить:

  • из-за некоторых особенностей того, как вы перезапускаете Docker-контейнеры, ZK 2-5 не знают, что Kafka 1 создал znode в ZK 1, описывающий вашу тестовую тему, чтобы иметь "Реплики: 1, ISR: 1", или не согласен использовать версию ZK 1, поскольку кворума нет
  • некоторое подмножество контейнеров 2-5 запускается и 3 из 5 ZK образуют кворум без ожидания ZK 1
  • что-то (потребительское или средство командной строки или автоматическое создание посредника) пытается использовать тему, и поскольку кворум ZK соглашается с тем, что он еще не существует, создает его и назначает реплику одному из доступных в настоящее время посредников (в данном случае 5),
  • контейнер 1 запускается, ZK 1 должен отказаться от своей версии темы znode в пользу кворума, Kafka должен отказаться от своей реплики в пользу описанной в настоящее время.

Я не уверен, каков правильный подход для перехода от Zookeeper с одним узлом к ​​реплицированной установке, и не могу найти его в документации. Возможно, вам придется изначально назначить больше weight вам первый ZK, так что вы гарантируете, что он станет лидером и заставит свою конфигурацию темы на других узлах ZK.

Вы создали проблему JIRA? Получил ответ от разработчиков?

Другие вопросы по тегам