Десериализация Avro из Kafka с использованием fastavro
Я создаю приложение, которое получает данные от Kafka. При использовании стандартной библиотеки avro, предоставленной Apache ( https://pypi.org/project/avro-python3/), результаты верны, однако процесс десериализации очень медленный.
class KafkaReceiver:
data = {}
def __init__(self, bootstrap='192.168.1.111:9092'):
self.client = KafkaConsumer(
'topic',
bootstrap_servers=bootstrap,
client_id='app',
api_version=(0, 10, 1)
)
self.schema = avro.schema.parse(open("Schema.avsc", "rb").read())
self.reader = avro.io.DatumReader(self.schema)
def do(self):
for msg in self.client:
bytes_reader = io.BytesIO(msg.value)
decoder = BinaryDecoder(bytes_reader)
self.data = self.reader.read(decoder)
Читая, почему это так медленно, я обнаружил fastavro
что должно быть намного быстрее. Я использую это так:
def do(self):
schema = fastavro.schema.load_schema('Schema.avsc')
for msg in self.client:
bytes_reader = io.BytesIO(msg.value)
bytes_reader.seek(0)
for record in reader(bytes_reader, schema):
self.data = record
И поскольку все работает при использовании библиотеки Apache, я ожидал, что все будет работать так же с fastavro
. Однако при запуске я получаю
File "fastavro/_read.pyx", line 389, in fastavro._read.read_map
File "fastavro/_read.pyx", line 290, in fastavro._read.read_utf8
File "fastavro/_six.pyx", line 22, in fastavro._six.py3_btou
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xfc in position 3: invalid start byte
Обычно я не программирую на Python, поэтому не знаю, как к этому подойти. Есть идеи?
1 ответ
В fastavro.reader
ожидает формат файла avro, который включает заголовок. Похоже, у вас есть сериализованная запись без заголовка. Я думаю, вы сможете прочитать это, используя fastavro.schemaless_reader
.
Так что вместо:
for record in reader(bytes_reader, schema):
self.data = record
Вы бы сделали:
self.data = schemaless_reader(bytes_reader, schema)