Как вывести схему из реестра Confluent Schema Registry с помощью Apache Beam?
Я пытаюсь создать конвейер Apache Beam, в котором я читаю тему kafka и загружаю ее в Bigquery. Используя реестр схем Confluent, я должен иметь возможность вывести схему при загрузке в Bigquey. Однако схема не выводится, когда загрузка завершается неудачей.
Ниже представлен весь код конвейера.
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers("broker-url:9092")
.withTopic("beam-in")
.withConsumerConfigUpdates(consumerConfig)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(schemaRegUrl, subj))
.withKeyDeserializer(ByteArrayDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata()
)
.apply("Drop Kafka message key", Values.create())
.apply(
"Write data to BQ",
BigQueryIO
.<GenericRecord>write()
.optimizedWrites()
.useBeamSchema()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.withCustomGcsTempLocation("gs://beam-tmp-load")
.withNumFileShards(10)
.withMethod(FILE_LOADS)
.withTriggeringFrequency(Utils.parseDuration("10s"))
.to(new TableReference()
.setProjectId("my-project")
.setDatasetId("loaded-data")
.setTableId("beam-load-test")
);
return pipeline.run();
При запуске я получаю следующую ошибку, которая связана с тем, что я вызываю useBeamSchema(), а hasSchema() возвращает false:
Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:2595)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:2579)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1726)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
at KafkaToBigQuery.run(KafkaToBigQuery.java:159)
at KafkaToBigQuery.main(KafkaToBigQuery.java:64)