Потребителям Apache Beam KafkaIO в группе потребителей назначается уникальный идентификатор группы
Я запускаю несколько экземпляров apache beam KafkaIO с помощью DirectRunner, которые читаются из той же темы. Но сообщение доставляется во все запущенные экземпляры. После просмотра конфигурации Kafka, которую я нашел, к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы.
- group.id = Reader-0_offset_consumer_559337182_my_group
- group.id = Reader-0_offset_consumer_559337345_my_group
Таким образом, каждому экземпляру назначен уникальный group.id, и именно поэтому сообщения доставляются во все экземпляры.
pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read().withReadCommitted()
.withConsumerConfigUpdates(
new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group")
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build())
.withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
.withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()
Итак, какую конфигурацию я должен предоставить, чтобы все потребители в группе не читали одно и то же сообщение
1 ответ
Да, это происходит потому, что к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы. Из-за этого кафка не знает, запускаете ли вы еще один экземпляр. Следовательно, всем потребителям доставляются одни и те же сообщения.
Следовательно, я мог придумать один способ решения проблемы: вместо того, чтобы указать тему и позволить луче вычислить количество потребителей для всех разделов, вы можете явно указать разделы темы для каждого экземпляра apache beam KafkaIO с помощью DirectRunner.
Вам нужно будет пройти List
типа TopicPartition
к методу withTopicPartitions
.
KafkaIO.<String, String>read()
.withCreateTime(Duration.standardMinutes(1))
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build())
.withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0)))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata();
Приведенный выше код будет читать сообщения только от partition 0
. Следовательно, таким образом вы можете запускать несколько экземпляров одной и той же программы без доставки одинаковых сообщений всем потребителям.