Производитель Apache Beam KafkaIO направляет разные сообщения в разные темы
У меня есть случай использования, когда входящие данные имеют ключ, который идентифицирует другой тип данных. Есть одна тема кафки ввода, в которую бросаются все типы данных. Пневматический конвейер считывает все сообщения из входной темы kafka и должен направлять в разные темы kafka в зависимости от ключа.
В настоящее время KafkaIO не поддерживает запись в несколько тем с использованием одного производителя. Следующий код - это внутренний рабочий кодKafkaIO.write()
final class AutoValue_KafkaIO_Write<K, V> extends Write<K, V> {
private final String topic;
private final WriteRecords<K, V> writeRecordsTransform;
private AutoValue_KafkaIO_Write(@Nullable String topic, WriteRecords<K, V> writeRecordsTransform) {
this.topic = topic;
this.writeRecordsTransform = writeRecordsTransform;
}
Как это сделать с помощью производителя kafkaIO луча apache?
1 ответ
После нескольких дней попыток добиться маршрутизации сообщений, в настоящее время kafkaIO не поддерживает маршрутизацию сообщений по разным темам.
Обходной путь - создать продюсера kafka для каждой отдельной темы и разделить элементы по разным коллекциям в зависимости от того, какие из них отправляются в разные темы kafka.