Клиент kafka отправляет запрос на раздел, где упал брокер

Я использую модуль kafka-node для отправки сообщения kafka. В кластерной среде, где у меня есть тема с 3 разделами и фактором репликации как 3.

Описание темы -

Topic:clusterTopic      PartitionCount:3        ReplicationFactor:3    Configs:min.insync.replicas=2,segment.bytes=1073741824
        Topic: clusterTopic     Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: clusterTopic     Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
        Topic: clusterTopic     Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 1,2,3

Конфигурация производителя -

        "requireAcks": 1,
        "attributes": 2,
        "partitionerType": 2,
        "retries": 2

Когда я отправляю данные, он следует за типом раздела как циклический (2), как в циклическом режиме

когда я выполняю следующие шаги

  • Получите экземпляр HighLevelProducer, подключенный к kafka:9092,kafka:9093
  • Отправить сообщение
  • остановить kafka-server:9092 вручную
  • попробуйте отправить другое сообщение с помощью HighLevelProducer, и send() вызовет обратный вызов с ошибкой: TimeoutError: Время ожидания запроса истекло через 30000 мс

Я ожидаю, что если раздел недоступен (поскольку брокер не работает), производитель должен автоматически отправлять данные в следующий доступный раздел, но я теряю сообщение из-за исключения

Исключение составляет следующее -

  TimeoutError: Request timed out after 3000ms
    at new TimeoutError (\package\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
    at Timeout.timeoutId._createTimeout [as _onTimeout] (\package\node_modules\kafka-node\lib\kafkaClient.js:980:14)
    at ontimeout (timers.js:424:11)
    at tryOnTimeout (timers.js:288:5)
    at listOnTimeout (timers.js:251:5)
    at Timer.processTimers (timers.js:211:10)
(node:56416) [DEP0079] DeprecationWarning: Custom inspection function on Objects via .inspect() is deprecated
  kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
  kafka-node:KafkaClient createBroker kafka1 9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
  kafka-node:KafkaClient createBroker kafka1 9092 +0ms

1 ответ

Пожалуйста, отправьте серверы начальной загрузки для подтверждения, но я полагаю, что вы испытываете, основываясь на имеющейся информации, следующее:

  • У вас есть min.insync.replicas равным 2
  • У вас установлено значение 1

С этими настройками производитель отправит событие реплике лидера и предположит, что сообщение безопасно.

Если он не удастся сразу после отправки, но до того, как подписчики догнали его, вы потеряете сообщение, поскольку ожидаете только одного подтверждения.

Однако с точки зрения брокера вы указываете, что для доступности темы требуется наличие двух синхронизируемых реплик. По умолчанию только синхронизированные реплики могут быть избраны лидерами. Поскольку отказ первого из них приведет к рассинхронизации подписчиков, ваша тема может быть отключена. Вы можете проверить это в своих тестах, предполагая некоторые настройки.

Чтобы исправить это, попробуйте следующее:

  1. Если высокая доступность наиболее важна, установите min.insync.replicas на 1 и acks на 1.
  2. Если потеря данных неприемлема, установите min.insync.replicas равным 2 и подтвердите все

Вы также можете установить unclean.leader.election.enable в значение true для высокой доступности, так как это позволит выбрать лидером несинхронизированную реплику, но тогда существует вероятность потери данных.

Другие вопросы по тегам