производитель aiokafka, работающий на одном ядре в контейнере python:buster Docker, обеспечивает очень низкую пропускную способность 1 МиБ в секунду

У меня есть простой код для проверки производительности библиотеки. Я использую компьютер с Windows, на котором запущен Docker для Windows и виртуальная машина с 8 ядрами.

В этой ситуации библиотека, похоже, обеспечивает очень низкую пропускную способность производителя, около 1 Мбайт в секунду:

      async def send_many():
  f = bigass_object_factory.BigAssObjectFactory()
  v = f.create_bytearray()
  print(f'{len(v)} length {type(v)} named v')

  producer = AIOKafkaProducer(
    bootstrap_servers='kafka:9092',
    loop=asyncio.get_event_loop(),
    acks=0,
  )

  pr = cProfile.Profile()
  pr.enable()

  await producer.start()

  total_sent = 0
  started = time.time()

  for i in range(100):
    await producer.send('test_topic_aiokafka', value=v)
    print(f'sent {i}')


  await producer.stop()

  pr.disable()
  s = io.StringIO()
  sortby = 'cumtime'
  ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
  ps.print_stats()
  print(s.getvalue())

def send_many_main():
  asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  asyncio.get_event_loop().run_until_complete(send_many())

p1 = Process(target=send_many_main)
p1.start()
p1.join()

Это занимает 60+ секунд для запуска на моем компьютере, IE 60+ секунд, чтобы отправить 100 сообщений 1 MiB. Я знаю, что это большое сообщение для Кафки, но это смешно, и я совершенно уверен, что это не должно быть выступлением. Когда я использую профиль cProfile я вижу это

         ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      169    0.003    0.000   70.619    0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:260(_send_produce_req)
      169    0.008    0.000   70.615    0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:732(do)
      100    0.015    0.000   70.132    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:701(create_request)
      100    0.003    0.000   70.103    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:236(get_data_buffer)
      100    0.001    0.000   70.101    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:83(_build)
      100    0.000    0.000   70.099    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:526(build)
      100    0.119    0.001   70.099    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:482(write_header)
      100    0.000    0.000   69.968    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/util.py:115(calc_crc32c_py)
      100    0.001    0.000   69.968    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:130(crc)
      100   69.967    0.700   69.967    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:100(crc_update)
      169    0.004    0.000    0.473    0.003 /usr/local/lib/python3.9/site-packages/aiokafka/client.py:460(send)
      104    0.104    0.001    0.466    0.004 /usr/local/lib/python3.9/site-packages/aiokafka/conn.py:374(send)
      208    0.001    0.000    0.294    0.001 /usr/local/lib/python3.9/site-packages/kafka/util.py:155(__call__)
      208    0.004    0.000    0.293    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/struct.py:40(_encode_self)
  408/208    0.025    0.000    0.289    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:143(encode)
      107    0.002    0.000    0.270    0.003 /opt/project/tests/contrib/aiokafka/test_aiokafka_performance.py:34(send_many)
  408/208    0.003    0.000    0.237    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:146(<listcomp>)
  202/102    0.002    0.000    0.233    0.002 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:181(encode)

Так что он тратит все свое время в crc32c.py. Когда я изучил этот код, я пришел к выводу, что невозможно избежать вызова этого кода на стороне производителя. (Очевидно, если вы не можете сказать, что я не эксперт Kafka и понятия не имею, зачем нужна эта проверка).

Но кажется возможным, что причина такой медленной работы в том, что он использует версию Python, а не собственную версию. По крайней мере, я на это надеюсь ...

Итак, вопросы:

  1. Означает ли это, что проблема?
  2. Как мне убедиться, что я использую родной crc32c ?
  3. Кто-нибудь видел это в отношении использования aiokafka на python:buster-3.9.x до?

0 ответов

Другие вопросы по тегам