В ожидании выборов руководства в KafkaJS
Ситуация
Я использую kafkajs, чтобы писать в некоторые динамически генерируемые темы kafka.
Я обнаружил, что письма по этим темам сразу после регистрации моего продюсера регулярно вызывают ошибку: There is no leader for this topic-partition as we are in the middle of a leadership election
.
Полная ошибка:
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}
Код
Вот код, вызывающий проблему:
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
await producer.connect()
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()
Вопрос
Два вопроса:
- Есть ли что-то особенное, что я должен делать при подключении к производителю (или отправке), чтобы гарантировать, что логика блокируется до тех пор, пока производитель действительно не будет готов отправлять данные в тему kafka?
- Есть ли что-то особенное, что я должен делать при отправке данных производителю, чтобы гарантировать, что сообщения не будут отброшены?
1 ответ
Решение
Кафкайс предлагает createTopics
через клиент администратора, который имеет необязательныйwaitForLeaders
флаг:
admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
}
Использование этого решает проблему.
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
const admin = kafka.admin()
await admin.connect()
await producer.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
})
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()
К сожалению, это приведет к другой ошибке, если тема уже существует, но это отдельный вопрос, и я подозреваю, что эта ошибка более информативна, чем нарушение.
{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}