Производитель Apache Beam KafkaIO направляет разные сообщения в разные темы

У меня есть случай использования, когда входящие данные имеют ключ, который идентифицирует другой тип данных. Есть одна тема кафки ввода, в которую бросаются все типы данных. Пневматический конвейер считывает все сообщения из входной темы kafka и должен направлять в разные темы kafka в зависимости от ключа.

В настоящее время KafkaIO не поддерживает запись в несколько тем с использованием одного производителя. Следующий код - это внутренний рабочий кодKafkaIO.write()

final class AutoValue_KafkaIO_Write<K, V> extends Write<K, V> {
    private final String topic;
    private final WriteRecords<K, V> writeRecordsTransform;

    private AutoValue_KafkaIO_Write(@Nullable String topic, WriteRecords<K, V> writeRecordsTransform) {
        this.topic = topic;
        this.writeRecordsTransform = writeRecordsTransform;
    }

Как это сделать с помощью производителя kafkaIO луча apache?

1 ответ

Решение

После нескольких дней попыток добиться маршрутизации сообщений, в настоящее время kafkaIO не поддерживает маршрутизацию сообщений по разным темам.

Обходной путь - создать продюсера kafka для каждой отдельной темы и разделить элементы по разным коллекциям в зависимости от того, какие из них отправляются в разные темы kafka.

Другие вопросы по тегам