Как установить AvroCoder с KafkaIO и Apache Beam с Java
Я пытаюсь создать конвейер, который передает данные из темы Kafka в BigQuery Google. Данные в теме есть в Авро.
Я вызываю функцию apply 3 раза. Один раз прочитать из Kafka, один раз извлечь запись и один раз написать в BigQuery. Вот основная часть кода:
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers(options.getKafkaBrokers().get())
.withTopics(Utils.getListFromString(options.getKafkaTopics()))
.withKeyDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
options.getSchemaRegistryUrl().get(),
options.getSubject().get())
)
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
options.getSchemaRegistryUrl().get(),
options.getSubject().get()))
.withoutMetadata()
)
.apply("Extract GenericRecord",
MapElements.into(TypeDescriptor.of(GenericRecord.class)).via(KV::getValue)
)
.apply(
"Write data to BQ",
BigQueryIO
.<GenericRecord>write()
.optimizedWrites()
.useBeamSchema()
.useAvroLogicalTypes()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
//Temporary location to save files in GCS before loading to BQ
.withCustomGcsTempLocation(options.getGcsTempLocation())
.withNumFileShards(options.getNumShards().get())
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withMethod(FILE_LOADS)
.withTriggeringFrequency(Utils.parseDuration(options.getWindowDuration().get()))
.to(new TableReference()
.setProjectId(options.getGcpProjectId().get())
.setDatasetId(options.getGcpDatasetId().get())
.setTableId(options.getGcpTableId().get()))
);
При запуске я получаю следующую ошибку:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Extract GenericRecord/Map/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.apache.avro.generic.GenericRecord.
Building a Coder using a registered CoderProvider failed.
Как мне настроить кодировщик на правильное чтение Avro?
1 ответ
К этому есть как минимум три подхода:
- Установите кодировщик inline:
pipeline.apply("Read from Kafka", ....)
.apply("Dropping key", Values.create())
.setCoder(AvroCoder.of(Schema schemaOfGenericRecord))
.apply("Write data to BQ", ....);
Обратите внимание, что ключ отбрасывается, потому что он не используется, поэтому вам больше не понадобятся MapElements.
- Зарегистрируйте кодировщик в экземпляре конвейера CoderRegistry:
pipeline.getCoderRegistry().registerCoderForClass(GenericRecord.class, AvroCoder.of(Schema genericSchema));
- Получите кодировщик из реестра схем через:
ConfluentSchemaRegistryDeserializerProvider.getCoder(CoderRegistry registry)