Apache Beam KafkaIO упоминает раздел темы вместо имени темы

Apache Beam KafkaIO поддерживает для потребителей kafka чтение только из указанных разделов. У меня есть следующий код.

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .commitOffsetsInFinalize()
                .withTopicPartitions(List<TopicPartitions>)

У меня следующие 2 вопроса.

  1. Как мне получить имена разделов из кафки? Как мне упомянуть это в kafkaIO?
  2. Создает ли луч Apache количество потребителей kafka, равное списку разделов, упомянутому при создании потребителя kafka?

1 ответ

Решение

Я сам нашел ответы.

Как мне сказать kafkaIO читать с определенных разделов?

kafkaIO имеет метод withTopicPartitions(List<TopicPartitions>) который принимает список TopicPartition объекты.

Разделы тем именуются последовательными номерами, начиная с нуля. Следовательно, следующее должно работать

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .commitOffsetsInFinalize()
                .withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0),new TopicPartition(topicName, 1),new TopicPartition(topicName, 2)))

Чтобы проверить это, используйте kafkacat и следующая команда

kafkacat -P -b localhost:9092 -t sample -p 0 - Эта команда производит до указанного раздела.

Создает ли луч Apache количество потребителей kafka, равное списку разделов, упомянутому при создании потребителя kafka?

Он порождает одну группу потребителей с количеством потребителей, равным количеству разделов, явно указанных при создании объекта kafka Producer.

Другие вопросы по тегам