Avro ByteBuffer.allocate java.lang.IllegalArgumentException: null

Я пытаюсь отправить данные через поток Kinesis в формате Avro. Но я столкнулся с проблемой - IllegalArgumentException происходил все время, вот мой код:

user.avsc

{
    "doc": "User schema",
    "namespace": "avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"}
    ]
}

Режиссер:

User user = User.newBuilder().setName("Igor").build()
byte[] bytes = toAvro(user); 


kinesis.putRecord(
                    new PutRecordRequest()
                            .withStreamName(streamName)
                            .withData(ByteBuffer.wrap(bytes))
                            .withPartitionKey("test_partition_key")
            );

private static byte[] toAvro(GenericContainer obj) throws IOException {
        DatumWriter datumWriter = new SpecificDatumWriter();
        byte[] bytes;
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter)) {
            dataFileWriter.create(obj.getSchema(), baos);
            dataFileWriter.append(obj);
            dataFileWriter.flush();
            baos.flush();
            bytes = baos.toByteArray();
        }
        System.out.println(new String(bytes, StandardCharsets.UTF_8));
        return bytes;
    }

Потребитель:

public class DefaultRecordProcessor implements IRecordProcessor {
  @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        System.out.println("Processing " + records.size() + " records");
        List<String> data = records.stream()
                .map(Record::getData)
                .map(d -> new String(d.array(), Charset.forName("UTF-8")))
                .map(this::convert)
                .collect(toList());
    }

private GenericData convert(String data) {
 SpecificData specificData = new SpecificData();

           GenericData result = null;
            DatumReader<GenericData> datumReader = new SpecificDatumReader<>(null, schema, specificData);
            DataFileReader<GenericData> dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(data.getBytes()), datumReader);
            while (dataFileReader.hasNext()) {
                result = dataFileReader.next(result);
            }
            return result;
}

}

Трассировки стека:

java.lang.IllegalArgumentException: null
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.avro.io.BinaryDecoder.readBytes(BinaryDecoder.java:288)
    at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:112)
    at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:97)

Я обнаружил очень похожую проблему, но она мне не помогла - Ошибка при запросе таблицы кустов с авро-поддержкой: java.lang.IllegalArgumentException

ОБНОВЛЕНИЕ: но если я не передаю данные через поток Kinesis, он работает правильно:

GenericContainer data = User.newBuilder().setName("Igor").build();

        // write
        DatumWriter datumWriter = new SpecificDatumWriter<>(data.getSchema(), new SpecificData());
        byte[] bytes;
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter)) {
            dataFileWriter.create(data.getSchema(), baos);
            dataFileWriter.append(data);
            dataFileWriter.flush();
            baos.flush();
            bytes = baos.toByteArray();
        }
        System.out.println(new String(bytes, StandardCharsets.UTF_8));

        // read
        SpecificData specificData = new SpecificData();
        Object result = null;
        DatumReader datumReader = new SpecificDatumReader<>(null, data.getSchema(), specificData);
        DataFileReader dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(bytes), datumReader);
        while (dataFileReader.hasNext()) {
            result = dataFileReader.next(result);
        }
        System.out.println(result);

ВЫХОД:

bjavro.schema�{"type":"record","name":"User","namespace":"avro","doc":"User schema","fields":[{"name":"name","type":{"type":"string","avro.java.string":"String"}}]}������ &�4^�[l

    Igor������ &�4^�[l
    {"name": "Igor"}

но с другой стороны, если я установлю данные напрямую:

bytes = "Obj\u0001\u0002\u0016avro.schema�\u0002{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"avro\",\"doc\":\"User schema\",\"fields\":[{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}\u0000z4v܉I��PJ19���#\u0002\n".getBytes();

Я все еще получаю ошибку

1 ответ

Проблема возникла из-за преобразования байта [] в строку.

List<String> data = records.stream()
                .map(Record::getData)
                .map(d -> new String(d.array(), Charset.forName("UTF-8")))
                .map(this::convert)
                .collect(toList());

map (d -> new String (d.array (), Charset.forName ("UTF-8")))

Я удалил эту часть и обработал данные как byte[], и это работает.

Другие вопросы по тегам