Как вывести схему из реестра Confluent Schema Registry с помощью Apache Beam?

Я пытаюсь создать конвейер Apache Beam, в котором я читаю тему kafka и загружаю ее в Bigquery. Используя реестр схем Confluent, я должен иметь возможность вывести схему при загрузке в Bigquey. Однако схема не выводится, когда загрузка завершается неудачей.

Ниже представлен весь код конвейера.

    pipeline
        .apply("Read from Kafka",
                KafkaIO
                        .<byte[], GenericRecord>read()
                        .withBootstrapServers("broker-url:9092")
                        .withTopic("beam-in")
                        .withConsumerConfigUpdates(consumerConfig)
                        .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(schemaRegUrl, subj))
                        .withKeyDeserializer(ByteArrayDeserializer.class)
                        .commitOffsetsInFinalize()
                        .withoutMetadata()

        )
        .apply("Drop Kafka message key", Values.create())
        .apply(
                "Write data to BQ",
                BigQueryIO
                        .<GenericRecord>write()
                        .optimizedWrites()
                        .useBeamSchema()
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
                        .withCustomGcsTempLocation("gs://beam-tmp-load")
                        .withNumFileShards(10)
                        .withMethod(FILE_LOADS)
                        .withTriggeringFrequency(Utils.parseDuration("10s"))
                        .to(new TableReference()
                                .setProjectId("my-project")
                                .setDatasetId("loaded-data")
                                .setTableId("beam-load-test")
        );
return pipeline.run();

При запуске я получаю следующую ошибку, которая связана с тем, что я вызываю useBeamSchema(), а hasSchema() возвращает false:

Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:2595)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:2579)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1726)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
at KafkaToBigQuery.run(KafkaToBigQuery.java:159)
at KafkaToBigQuery.main(KafkaToBigQuery.java:64)

0 ответов