Встроенный Kafka: KTable+KTable leftJoin производит дубликаты записей
Я прихожу в поисках знания тайного.
Во-первых, у меня есть две пары тем, по одной теме в каждой паре, которая входит в другую тему. Два KTables формируются последними темами, которые используются в KTable+KTable leftJoin. Проблема в том, что leftJoin создает ТРИ записи, когда я записываю одну запись в KTable. Я ожидал бы две записи в форме (A-ноль, AB), но вместо этого я получаю (A-ноль, AB, A-ноль). Я подтвердил, что KTables получают по одной записи каждый.
Я возился с CACHE_MAX_BYTES_BUFFERING_CONFIG, чтобы включить / отключить кэширование хранилища состояний. Поведение выше - с CACHE_MAX_BYTES_BUFFERING_CONFIG, установленным в 0. Когда я использую значение по умолчанию для CACHE_MAX_BYTES_BUFFERING_CONFIG, я вижу следующие записи, выводимые из объединения: (AB, AB, A-null)
Вот конфигурации для потоков, потребителей, производителей:
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // fiddled with
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
Код API процессора (санированный), который испытывает такое поведение, приведен ниже, обратите внимание на сопряженные темы [A1, A2] и [B1, B2]:
KTable<Long, Value> kTableA =
kstreamBuilder.table(longSerde, valueSerde, topicA2);
kstreamBuilder.stream(keySerde, envelopeSerde, topicA1)
.to(longSerde, valueSerde, topicA2);
kstreamBuilder.stream(keySerde, envelopeSerde, topicB1)
.to(longSerde, valueSerde, topicB2.topicName);
KTable<Long, Value> kTableB =
kstreamBuilder.table(longSerde, valueSerde, topicB2.topicName);
KTable<Long, Result> joinTable = kTableA.leftJoin(kTableB, (a,b) -> {
// value joiner called three times with only a single record input
// into topicA1 and topicB1
});
joinTable.groupBy(...)
.aggregate(...)
.to(longSerde, aggregateSerde, outputTopic);
Заранее спасибо за любую помощь, о доброжелательные.
Обновление: я работал с одним сервером kafka и 1 разделом на тему и испытал такое поведение. Когда я увеличил количество серверов до 2 и количество разделов до 3, мой вывод становится (A-null).
Мне кажется, мне нужно потратить еще немного времени с руководством по кафке...