Apache Beam Java KafkaIO пишет Avro - регистрируемая схема несовместима с более ранней схемой
Я получаю следующую ошибку при попытке написать сообщение на луч KafkaIO apache в формате AVRO.
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"MyClass","namespace":"my.namespace","fields":[{"name":"value","type":"double"}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
Конвейер просто читает одно и то же сообщение avro из темы Kafka и записывает в другую тему.
Для использования сообщения хорошо работает следующее:
Pipeline pipeline = Pipeline.create(options);
PTransform<PBegin, PCollection<KafkaRecord<String, MyClass>>> kafka_read = KafkaIO.<String, MyClass>read()
.withBootstrapServers("localhost:9092")
.withTopic("topic-in")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
.updateConsumerProperties(ImmutableMap.of("schema.registry.url", "http://localhost:8081"));
Ошибка появляется при попытке написать в Кафку
pipeline.apply(kafka_read)
.apply("Forward", ParDo.of(new TransformMyClass()))
.apply(KafkaIO.<String, MyClass>write()
.withBootstrapServers("localhost:9092")
.withTopic("topic-out")
.withKeySerializer(StringSerializer.class)
.withValueSerializer((Class) KafkaAvroSerializer.class)
.updateProducerProperties(ImmutableMap.of("schema.registry.url", "http://localhost:8081")));
MyClass был создан из схемы с использованием mvn generate-sources как у производителя, так и у этого потребителя / производителя.
Функция преобразования выглядит следующим образом:
public class TransformMyClass extends DoFn<KafkaRecord<String, MyClass>, KV<String, MyClass>> {
@ProcessElement
public void transformMyClass(ProcessContext ctx) {
ctx.output(KV.of("key", ctx.element().getKV().getValue()));
}}
Это правильный способ записи формата Avro с помощью KafkaIO?
Спасибо.