Kafka Stream работает с JoinWindow для воспроизведения данных
У меня есть 2 потока данных, и я хочу иметь возможность присоединиться к ним в течение 1 месяца, скажем. Когда у меня есть живые данные, с KStream все просто и весело. Я сделал что-то вроде этого;
KStream<String, GenericRecord> stream1 =
builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());
KStream<String, GenericRecord> stream2 =
builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());
long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
KStream<String, GenericRecord> joinStream = stream1.join(stream2,
new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
....
....
....
return jonnedRecord;
}
}, JoinWindows.of(joinWindowSizeMs));
Проблема появляется, когда я хочу сделать воспроизведение данных. скажем, я хочу повторно выполнить это объединение для данных, которые у меня есть за последние 6 месяцев, так как я запускаю конвейер для всех данных одновременно, kafkaStream объединит все соединяемые данные, и это не учитывает разницу во времени (что это должно только присоединиться к прошлому месяцу данных). Я предполагаю, что время JoinWindow - это время, когда мы вставляем данные в тему Kafka, я прав?
И как я могу изменить и манипулировать этим временем, чтобы я мог правильно запустить свое воспроизведение данных, я имею в виду, что для повторной вставки этих данных за последние 6 месяцев для каждой соответствующей записи должно потребоваться окно в один месяц и присоединение, основанное на этой записи.
Этот вопрос не является дубликатом Как управлять Kafka KStream для оконного соединения Kstream? Там я спросил о том, как я могу присоединиться, основываясь на окне времени. здесь я говорю о воспроизведении данных. Насколько я понимаю, во время присоединения к Kafka нужно время, когда данные вставляются в тему, как время для JoinWindow, поэтому, если вы хотите выполнить воспроизведение данных и повторно вставить данные за 6 месяцев назад, kafka примет их как новые данные, которые вставлен сегодня и присоединится к нему с некоторыми другими данными, которые на самом деле на сегодня, которые не должны.
1 ответ
API потоков Kafka использует временные метки, возвращаемые TimestampExtractor
вычислять объединения. По умолчанию это метка времени встроенных метаданных записи. (см. http://docs.confluent.io/current/streams/concepts.html)
По умолчанию, KafkaProducer
устанавливает эту метку времени на текущее системное время записи. (В качестве альтернативы вы можете настроить посредников для каждой темы на перезапись предоставленных производителем временных меток записей системным временем брокера в то время, когда брокер сохранил запись - это обеспечивает семантику "времени приема".)
Таким образом, это не проблема Kafka Streams как таковая.
Есть несколько вариантов решения проблемы:
Если ваши данные уже есть в теме, вы можете просто сбросить приложение Streams для повторной обработки старых данных. Для этого вы можете использовать инструмент сброса приложения (
bin/kafka-streams-application-reset.sh
). Вам также необходимо указатьauto.offset.reset
политика вearliest
в вашем приложении Streams. Проверьте документы - также, рекомендуется прочитать сообщение в блоге.
Это лучший подход, так как вам не нужно снова записывать данные в тему.
- Если ваши данные не в теме, и вам нужно записать данные, вы можете явно установить метку времени записи на уровне приложения, предоставив метку времени для каждой записи:
KafkaProducer producer = new KafkaProducer(...);
producer.send(new ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value));
Таким образом, если вы принимаете старые данные, вы можете установить временную метку явно, и Kafka Streams подберет ее и вычислит объединение соответствующим образом.