Как вывести схему avro из темы kafka в Apache Beam KafkaIO
Я использую kafkaIO Apache Beam для чтения из раздела, имеющего схему avro в реестре схем Confluent. Я могу десериализовать сообщение и записать в файлы. Но в конечном итоге я хочу написать в BigQuery. Мой конвейер не может определить схему. Как мне извлечь / вывести схему и прикрепить ее к данным в конвейере, чтобы мои последующие процессы (запись в BigQuery) могли вывести схему?
Вот код, в котором я использую URL-адрес реестра схемы для установки десериализатора и где я читаю из Kafka:
consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
options.getSchemaRegistryUrl());
String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();
ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers(options.getKafkaBrokers().get())
.withTopics(Utils.getListFromString(options.getKafkaTopics()))
.withConsumerConfigUpdates(consumerConfig)
.withValueDeserializer(valDeserializerProvider)
.withKeyDeserializer(ByteArrayDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata()
);
Сначала я подумал, что этого будет достаточно для луча, чтобы вывести схему, но этого не произошло, поскольку hasSchema() возвращает false.
Любая помощь будет оценена.
2 ответа
В настоящее время ведется работа по поддержке вывода схемы Avro, хранящейся в реестре конфлюентных схем, вKafkaIO
. Впрочем, теперь это можно сделать и в пользовательском конвейерном коде.
Этот код, вероятно, будет работать, но я еще не тестировал.
// Fetch Avro schema from CSR
SchemaRegistryClient registryClient = new CachedSchemaRegistryClient("schema_registry_url", 10);
SchemaMetadata latestSchemaMetadata = registryClient.getLatestSchemaMetadata("schema_name");
Schema avroSchema = new Schema.Parser().parse(latestSchemaMetadata.getSchema());
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
// Create KafkaIO.Read with Avro schema deserializer
KafkaIO.Read<String, GenericRecord> read = KafkaIO.<String, GenericRecord>read()
.withBootstrapServers("host:port")
.withTopic("topic_name")
.withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", schemaRegistryUrl))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(avroSchema));
// Apply Kafka.Read and set Beam schema based on Avro Schema
p.apply(read)
.apply(Values.<GenericRecord>create()).setSchema(schema,
AvroUtils.getToRowFunction(GenericRecord.class, avroSchema),
AvroUtils.getFromRowFunction(GenericRecord.class))
Тогда я думаю, ты можешь использовать BigQueryIO.Write
с участием useBeamSchema()
.