Как отправить ArrayList объектов в Kafka Avro продюсер, не вызывая метод send для каждой отдельной записи?

Я использую приведенный ниже код для создания записей Avro User класс в тему Кафки, и она работает нормально;

Класс отправителя

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;

import vo.User;

public class Sender8 {

    public static void main(String[] args) {

        User user = new User(10,"testName");
        Schema schema = ReflectData.get().getSchema(user.getClass());
        new GenericData.Record(schema);

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http://127.0.0.1:8081");

        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);

        ReflectDatumWriter<Object> reflectDatumWriter = new ReflectDatumWriter<>(schema);
        GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();

        try {
            reflectDatumWriter.write(user, EncoderFactory.get().directBinaryEncoder(bytes, null));
            GenericRecord avroRecord2 = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("avrotesttopic1", avroRecord2);
            producer.send(record);
            producer.flush();

        } catch (IOException e1) {
            e1.printStackTrace();
        }

        producer.close();
    }
}

Класс пользователя

public class User {
    int id;
    String name;

    public User(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

Иногда мне может понадобиться отправить коллекцию объектов в виде массива, например;

ArrayList<User> users = new ArrayList<User>();

в этой ситуации я создаю цикл для итерации по списку, выбора отдельных записей и вызова send() метод, как;

Iterator iter = users.iterator();
while (iter.hasNext()) {
   user = iter.next();
   //all other stuff here
   producer.send(record);
}

Это отлично работает. Но проблема в том, что если у моего архива 50 записей, producer.send(record) будет срабатывать 50 раз. Я хотел бы знать, существует ли какой-либо другой более эффективный метод для обработки этого, например, вызов отправителя только один раз для всех 50 записей.

1 ответ

Неясно, ожидает ли ваша тема одно сообщение с 50 записями в массиве или 50 отдельными пользовательскими сообщениями.

Если отдельные сообщения, это ожидаемое поведение. Там нет накладных расходов на звонки producer.send несколько раз. Это как сказать System.out.printи все, что вы делаете, это записываете данные в Kafka вместо консоли.

Даже видеть, что этот пример использует цикл while

Посмотри в pom.xml так же как src/main/avro чтобы увидеть, где используется плагин Avro и LogLine класс определены.

Если одна запись из 50 результатов, вам нужно создать схему для List<User> или определить класс как

class UserList {
    List<User> users;
}

Кроме того, как уже упоминалось в предыдущем посте, если вы просто используете плагин Avro Maven, эти классы могут быть созданы для вас

Например, в AVDL и начало работы с Avro в Java

@namespace("com.example")
protocol DomainModels {
    record User {
      int id;
      string name;
    }
}

Автоматически создаст схему Avro (avsc) и класс Java для com.example.User и getters/setters, equalsTo, toString и т. д.

Затем вы используете SpecificRecord Тип а не GenericRecord вот так

Producer<String, User> producer = new KafkaProducer<String, User>(props);
for (User u : list) {
    producer.send(u);
}

потому что сгенерированный класс User будет расширяться SpecificRecord


Опять же, если у вас был список объектов в Avro, то AVDL поддерживает массивы

@namespace("com.example")
protocol DomainModels {
    record User {
      int id;
      string name;
    }

    record UserList {
       array<User> users;
    }
}

Альтернативой тому, что вы делаете в данный момент, является использование формата AVSC, встроенного в код (или лучше считывание из файла), но это, по сути, то, что ReflectDatum генерирует.

Лично я не вижу необходимости в сборщике Reflect Avro, если у вас просто простой Java-объект без бизнес-логики. И если вам нужна бизнес-логика с сгенерированными классами из файлов AVDL/AVSC, вы можете более или менее извлечь ее для разделения служебных классов.

Другие вопросы по тегам