Потребителям Apache Beam KafkaIO в группе потребителей назначается уникальный идентификатор группы

Я запускаю несколько экземпляров apache beam KafkaIO с помощью DirectRunner, которые читаются из той же темы. Но сообщение доставляется во все запущенные экземпляры. После просмотра конфигурации Kafka, которую я нашел, к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы.

  1. group.id = Reader-0_offset_consumer_559337182_my_group
  2. group.id = Reader-0_offset_consumer_559337345_my_group

Таким образом, каждому экземпляру назначен уникальный group.id, и именно поэтому сообщения доставляются во все экземпляры.

pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read().withReadCommitted()
            .withConsumerConfigUpdates(
                    new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                            .put(ConsumerConfig.GROUP_ID_CONFIG, "my_group")
                            .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build())
            .withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
            .withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()

Итак, какую конфигурацию я должен предоставить, чтобы все потребители в группе не читали одно и то же сообщение

1 ответ

Да, это происходит потому, что к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы. Из-за этого кафка не знает, запускаете ли вы еще один экземпляр. Следовательно, всем потребителям доставляются одни и те же сообщения.

Следовательно, я мог придумать один способ решения проблемы: вместо того, чтобы указать тему и позволить луче вычислить количество потребителей для всех разделов, вы можете явно указать разделы темы для каждого экземпляра apache beam KafkaIO с помощью DirectRunner.

Вам нужно будет пройти List типа TopicPartition к методу withTopicPartitions.

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0)))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata();

Приведенный выше код будет читать сообщения только от partition 0. Следовательно, таким образом вы можете запускать несколько экземпляров одной и той же программы без доставки одинаковых сообщений всем потребителям.

Другие вопросы по тегам