В ожидании выборов руководства в KafkaJS

Ситуация

Я использую kafkajs, чтобы писать в некоторые динамически генерируемые темы kafka.

Я обнаружил, что письма по этим темам сразу после регистрации моего продюсера регулярно вызывают ошибку: There is no leader for this topic-partition as we are in the middle of a leadership election.

Полная ошибка:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

Код

Вот код, вызывающий проблему:

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  await producer.connect()
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

Вопрос

Два вопроса:

  1. Есть ли что-то особенное, что я должен делать при подключении к производителю (или отправке), чтобы гарантировать, что логика блокируется до тех пор, пока производитель действительно не будет готов отправлять данные в тему kafka?
  2. Есть ли что-то особенное, что я должен делать при отправке данных производителю, чтобы гарантировать, что сообщения не будут отброшены?

1 ответ

Решение

Кафкайс предлагает createTopicsчерез клиент администратора, который имеет необязательныйwaitForLeaders флаг:

admin.createTopics({
  waitForLeaders: true,
  topics: [
    { topic: 'myRandomTopicString123' },
  ],
}

Использование этого решает проблему.

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  const admin = kafka.admin()
  await admin.connect()
  await producer.connect()
  await admin.createTopics({
    waitForLeaders: true,
    topics: [
      { topic: 'myRandomTopicString123' },
    ],
  })
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

К сожалению, это приведет к другой ошибке, если тема уже существует, но это отдельный вопрос, и я подозреваю, что эта ошибка более информативна, чем нарушение.

{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}
Другие вопросы по тегам