производитель aiokafka, работающий на одном ядре в контейнере python:buster Docker, обеспечивает очень низкую пропускную способность 1 МиБ в секунду
У меня есть простой код для проверки производительности библиотеки. Я использую компьютер с Windows, на котором запущен Docker для Windows и виртуальная машина с 8 ядрами.
В этой ситуации библиотека, похоже, обеспечивает очень низкую пропускную способность производителя, около 1 Мбайт в секунду:
async def send_many():
f = bigass_object_factory.BigAssObjectFactory()
v = f.create_bytearray()
print(f'{len(v)} length {type(v)} named v')
producer = AIOKafkaProducer(
bootstrap_servers='kafka:9092',
loop=asyncio.get_event_loop(),
acks=0,
)
pr = cProfile.Profile()
pr.enable()
await producer.start()
total_sent = 0
started = time.time()
for i in range(100):
await producer.send('test_topic_aiokafka', value=v)
print(f'sent {i}')
await producer.stop()
pr.disable()
s = io.StringIO()
sortby = 'cumtime'
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
def send_many_main():
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.get_event_loop().run_until_complete(send_many())
p1 = Process(target=send_many_main)
p1.start()
p1.join()
Это занимает 60+ секунд для запуска на моем компьютере, IE 60+ секунд, чтобы отправить 100 сообщений 1 MiB. Я знаю, что это большое сообщение для Кафки, но это смешно, и я совершенно уверен, что это не должно быть выступлением. Когда я использую профиль
cProfile
я вижу это
ncalls tottime percall cumtime percall filename:lineno(function)
169 0.003 0.000 70.619 0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:260(_send_produce_req)
169 0.008 0.000 70.615 0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:732(do)
100 0.015 0.000 70.132 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:701(create_request)
100 0.003 0.000 70.103 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:236(get_data_buffer)
100 0.001 0.000 70.101 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:83(_build)
100 0.000 0.000 70.099 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:526(build)
100 0.119 0.001 70.099 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:482(write_header)
100 0.000 0.000 69.968 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/util.py:115(calc_crc32c_py)
100 0.001 0.000 69.968 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:130(crc)
100 69.967 0.700 69.967 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:100(crc_update)
169 0.004 0.000 0.473 0.003 /usr/local/lib/python3.9/site-packages/aiokafka/client.py:460(send)
104 0.104 0.001 0.466 0.004 /usr/local/lib/python3.9/site-packages/aiokafka/conn.py:374(send)
208 0.001 0.000 0.294 0.001 /usr/local/lib/python3.9/site-packages/kafka/util.py:155(__call__)
208 0.004 0.000 0.293 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/struct.py:40(_encode_self)
408/208 0.025 0.000 0.289 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:143(encode)
107 0.002 0.000 0.270 0.003 /opt/project/tests/contrib/aiokafka/test_aiokafka_performance.py:34(send_many)
408/208 0.003 0.000 0.237 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:146(<listcomp>)
202/102 0.002 0.000 0.233 0.002 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:181(encode)
Так что он тратит все свое время в
crc32c.py
. Когда я изучил этот код, я пришел к выводу, что невозможно избежать вызова этого кода на стороне производителя. (Очевидно, если вы не можете сказать, что я не эксперт Kafka и понятия не имею, зачем нужна эта проверка).
Но кажется возможным, что причина такой медленной работы в том, что он использует версию Python, а не собственную версию. По крайней мере, я на это надеюсь ...
Итак, вопросы:
- Означает ли это, что проблема?
- Как мне убедиться, что я использую родной
crc32c
? - Кто-нибудь видел это в отношении использования
aiokafka
наpython:buster-3.9.x
до?