Kafks consumer.poll не возвращает данных
У меня есть два брокера Kafka (2.11-0.11.0.1). Коэффициент репликации тем по умолчанию установлен равным 2. Производители записывают данные только в нулевой раздел.
И у меня есть назначенный исполнитель, который периодически запускает задачу. Когда он использует тему с небольшим количеством записей в минуту (100 в минуту), он работает как шарм. Но для огромных тем (10K в минуту) метод poll не возвращает данных.
Задача:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public final class TopicToDbPump implements Runnable {
private static final Logger log = LoggerFactory.getLogger(TopicToDbPump.class);
private final String topic;
private final TopicPartition topicPartition;
private final Properties properties;
public TopicToDbPump(String topic, Properties properties) {
this.topic = topic;
topicPartition = new TopicPartition(topic, 0);
this.properties = properties;
}
@Override
public void run() {
try (final Consumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.assign(Collections.singleton(topicPartition));
final long offset = readOffsetFromDb(topic);
consumer.seek(topicPartition, offset);
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if (records.isEmpty()) {
log.debug("No data from topic " + topic + " available");
return;
}
saveData(records.records(topic));
} catch (Throwable t) {
log.error("Etl process " + topic + " failed with exception", t);
}
}
}
Параметры потребителей:
"bootstrap.servers" = "host-1:9092,host-2:9092",
"group.id" = "my-group",
"enable.auto.commit" = "false",
"key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer",
"max.partition.fetch.bytes": "50000000",
"max.poll.records": "10000"
В чем дело?
1 ответ
API-интерфейс Kafka Consumer не гарантирует, что первый вызов poll()
вернет любые данные.
Потребитель сначала должен подключиться к кластеру, найти лидеров для всех разделов, которым он назначен. Как вы думаете, это может занять несколько секунд, поэтому вряд ли данные будут получены немедленно.
Вы должны вместо этого позвонить poll()
несколько раз, если данные не возвращаются первыми.