Потребители Apache Beam KafkaIO в группе потребителей читают одно и то же сообщение

Я использую KafkaIO в потоке данных для чтения сообщений из одной темы. Я использую следующий код.

KafkaIO.<String, String>read()
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
                .build())
//                .commitOffsetsInFinalize()
                .withTopics(Collections.singletonList(topicNames))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata();

Я запускаю программу потока данных на своем локальном компьютере с помощью прямого бегуна. Все работает нормально. Я запускаю другой экземпляр той же программы параллельно, то есть другого потребителя. Теперь я вижу повторяющиеся сообщения при обработке конвейера.

Хотя я предоставил идентификатор группы потребителей, запуск другого потребителя с тем же идентификатором группы потребителей (другой экземпляр той же программы) не должен обрабатывать те же элементы, которые обрабатываются другим потребителем, верно?

Как это получается при использовании бегуна потока данных?

1 ответ

Решение

Я не думаю, что заданные вами параметры гарантируют неповторяющуюся доставку сообщений по конвейерам.

  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG: это флаг для потребителя Kafka, а не для самого конвейера Beam. Похоже, это лучший способ и периодический, поэтому вы все равно можете видеть дубликаты в нескольких конвейерах.

  • withReadCommitted(): это просто означает, что Beam не будет читать незафиксированные сообщения. Опять же, это не предотвратит дублирование в нескольких конвейерах.

См. Здесь протокол использования источника Beam для определения начальной точки источника Kafka.

Чтобы гарантировать бесперебойную доставку, вероятно, вам придется читать из разных тем или из разных подписок.