Как вывести схему avro из темы kafka в Apache Beam KafkaIO

Я использую kafkaIO Apache Beam для чтения из раздела, имеющего схему avro в реестре схем Confluent. Я могу десериализовать сообщение и записать в файлы. Но в конечном итоге я хочу написать в BigQuery. Мой конвейер не может определить схему. Как мне извлечь / вывести схему и прикрепить ее к данным в конвейере, чтобы мои последующие процессы (запись в BigQuery) могли вывести схему?

Вот код, в котором я использую URL-адрес реестра схемы для установки десериализатора и где я читаю из Kafka:

    consumerConfig.put(
                        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
                        options.getSchemaRegistryUrl());

String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();

ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
            ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);

pipeline
        .apply("Read from Kafka",
                KafkaIO
                        .<byte[], GenericRecord>read()
                        .withBootstrapServers(options.getKafkaBrokers().get())
                        .withTopics(Utils.getListFromString(options.getKafkaTopics()))
                        .withConsumerConfigUpdates(consumerConfig)
                        .withValueDeserializer(valDeserializerProvider)
                        .withKeyDeserializer(ByteArrayDeserializer.class)

                        .commitOffsetsInFinalize()
                        .withoutMetadata()

        );

Сначала я подумал, что этого будет достаточно для луча, чтобы вывести схему, но этого не произошло, поскольку hasSchema() возвращает false.

Любая помощь будет оценена.

2 ответа

В настоящее время ведется работа по поддержке вывода схемы Avro, хранящейся в реестре конфлюентных схем, вKafkaIO. Впрочем, теперь это можно сделать и в пользовательском конвейерном коде.

Этот код, вероятно, будет работать, но я еще не тестировал.

// Fetch Avro schema from CSR
SchemaRegistryClient registryClient = new CachedSchemaRegistryClient("schema_registry_url", 10);
SchemaMetadata latestSchemaMetadata = registryClient.getLatestSchemaMetadata("schema_name");
Schema avroSchema = new Schema.Parser().parse(latestSchemaMetadata.getSchema());

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);


// Create KafkaIO.Read with Avro schema deserializer
KafkaIO.Read<String, GenericRecord> read = KafkaIO.<String, GenericRecord>read()
    .withBootstrapServers("host:port")
    .withTopic("topic_name")
    .withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", schemaRegistryUrl))
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(avroSchema));

// Apply Kafka.Read and set Beam schema based on Avro Schema
p.apply(read)
 .apply(Values.<GenericRecord>create()).setSchema(schema,
    AvroUtils.getToRowFunction(GenericRecord.class, avroSchema),
    AvroUtils.getFromRowFunction(GenericRecord.class))

Тогда я думаю, ты можешь использовать BigQueryIO.Write с участием useBeamSchema().