Apache Beam Java KafkaIO пишет Avro - регистрируемая схема несовместима с более ранней схемой

Я получаю следующую ошибку при попытке написать сообщение на луч KafkaIO apache в формате AVRO.

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"MyClass","namespace":"my.namespace","fields":[{"name":"value","type":"double"}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409

Конвейер просто читает одно и то же сообщение avro из темы Kafka и записывает в другую тему.

Для использования сообщения хорошо работает следующее:

Pipeline pipeline = Pipeline.create(options);

PTransform<PBegin, PCollection<KafkaRecord<String, MyClass>>> kafka_read = KafkaIO.<String, MyClass>read()
  .withBootstrapServers("localhost:9092")
  .withTopic("topic-in")
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
  .updateConsumerProperties(ImmutableMap.of("schema.registry.url", "http://localhost:8081"));

Ошибка появляется при попытке написать в Кафку

pipeline.apply(kafka_read)
  .apply("Forward", ParDo.of(new TransformMyClass()))
  .apply(KafkaIO.<String, MyClass>write()
    .withBootstrapServers("localhost:9092")
    .withTopic("topic-out")
    .withKeySerializer(StringSerializer.class)
    .withValueSerializer((Class) KafkaAvroSerializer.class)
    .updateProducerProperties(ImmutableMap.of("schema.registry.url", "http://localhost:8081")));

MyClass был создан из схемы с использованием mvn generate-sources как у производителя, так и у этого потребителя / производителя.

Функция преобразования выглядит следующим образом:

public class TransformMyClass extends DoFn<KafkaRecord<String, MyClass>, KV<String, MyClass>> {
@ProcessElement
public void transformMyClass(ProcessContext ctx) {
ctx.output(KV.of("key", ctx.element().getKV().getValue()));
}}

Это правильный способ записи формата Avro с помощью KafkaIO?

Спасибо.

0 ответов

Другие вопросы по тегам