Kafka Stream работает с JoinWindow для воспроизведения данных

У меня есть 2 потока данных, и я хочу иметь возможность присоединиться к ним в течение 1 месяца, скажем. Когда у меня есть живые данные, с KStream все просто и весело. Я сделал что-то вроде этого;

KStream<String, GenericRecord> stream1 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());

KStream<String, GenericRecord> stream2 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());

long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

    KStream<String, GenericRecord> joinStream = stream1.join(stream2,
            new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
                @Override
                public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
                    final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
                    ....
                    ....
                    ....

                    return jonnedRecord;
                }
            }, JoinWindows.of(joinWindowSizeMs));

Проблема появляется, когда я хочу сделать воспроизведение данных. скажем, я хочу повторно выполнить это объединение для данных, которые у меня есть за последние 6 месяцев, так как я запускаю конвейер для всех данных одновременно, kafkaStream объединит все соединяемые данные, и это не учитывает разницу во времени (что это должно только присоединиться к прошлому месяцу данных). Я предполагаю, что время JoinWindow - это время, когда мы вставляем данные в тему Kafka, я прав?
И как я могу изменить и манипулировать этим временем, чтобы я мог правильно запустить свое воспроизведение данных, я имею в виду, что для повторной вставки этих данных за последние 6 месяцев для каждой соответствующей записи должно потребоваться окно в один месяц и присоединение, основанное на этой записи.

Этот вопрос не является дубликатом Как управлять Kafka KStream для оконного соединения Kstream? Там я спросил о том, как я могу присоединиться, основываясь на окне времени. здесь я говорю о воспроизведении данных. Насколько я понимаю, во время присоединения к Kafka нужно время, когда данные вставляются в тему, как время для JoinWindow, поэтому, если вы хотите выполнить воспроизведение данных и повторно вставить данные за 6 месяцев назад, kafka примет их как новые данные, которые вставлен сегодня и присоединится к нему с некоторыми другими данными, которые на самом деле на сегодня, которые не должны.

1 ответ

Решение

API потоков Kafka использует временные метки, возвращаемые TimestampExtractor вычислять объединения. По умолчанию это метка времени встроенных метаданных записи. (см. http://docs.confluent.io/current/streams/concepts.html)

По умолчанию, KafkaProducer устанавливает эту метку времени на текущее системное время записи. (В качестве альтернативы вы можете настроить посредников для каждой темы на перезапись предоставленных производителем временных меток записей системным временем брокера в то время, когда брокер сохранил запись - это обеспечивает семантику "времени приема".)

Таким образом, это не проблема Kafka Streams как таковая.

Есть несколько вариантов решения проблемы:

  1. Если ваши данные уже есть в теме, вы можете просто сбросить приложение Streams для повторной обработки старых данных. Для этого вы можете использовать инструмент сброса приложения (bin/kafka-streams-application-reset.sh). Вам также необходимо указать auto.offset.reset политика в earliest в вашем приложении Streams. Проверьте документы - также, рекомендуется прочитать сообщение в блоге.

Это лучший подход, так как вам не нужно снова записывать данные в тему.

  1. Если ваши данные не в теме, и вам нужно записать данные, вы можете явно установить метку времени записи на уровне приложения, предоставив метку времени для каждой записи:
KafkaProducer producer = new KafkaProducer(...);
producer.send(new ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value));

Таким образом, если вы принимаете старые данные, вы можете установить временную метку явно, и Kafka Streams подберет ее и вычислит объединение соответствующим образом.

Другие вопросы по тегам