Параллельные запросы блокируются бесконечно после ровно 100 запросов с использованием asyncio

Я пробовал использовать и httpx, и aiohttp, и оба имеют этот жестко заданный предел.

import asyncio

import aiohttp
import httpx


async def main():
    client = aiohttp.ClientSession() 
    # client = httpx.AsyncClient(timeout=None)

    coros = [
        client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
        )
        for _ in range(500)
    ]

    for i, coro in enumerate(asyncio.as_completed(coros)):
        await coro
        print(i, end=", ")


asyncio.run(main())

Выход -

0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99

И он просто застрял на 99 с обеими библиотеками

Однако этого не происходит, если для каждого запроса используется новый сеанс.

Что я делаю неправильно? Разве весь смысл asyncio не в том, чтобы упростить подобные вещи?


Я попытался переписать это с помощью потоков, zmq и запросов, и он отлично работает -

import zmq

N_WORKERS = 100
N_ITERS = 500

ctx = zmq.Context.instance()


def worker():
    client = requests.Session()

    pull = ctx.socket(zmq.PULL)
    pull.connect("inproc://#1")

    push = ctx.socket(zmq.PUSH)
    push.connect("inproc://#2")

    while True:
        if not pull.recv_pyobj():
            return

        r = client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
        )
        push.send_pyobj(r.content)


def ventilator():
    push = ctx.socket(zmq.PUSH)
    push.bind("inproc://#1")

    # distribute tasks to all workers
    for _ in range(N_ITERS):
        push.send_pyobj(True)

    # close down workers
    for _ in range(N_WORKERS):
        push.send_pyobj(False)



# start workers & ventilator
threads = [Thread(target=worker) for _ in range(N_WORKERS)]
threads.append(Thread(target=ventilator))
for t in threads:
    t.start()

# pull results from workers
pull = ctx.socket(zmq.PULL)
pull.bind("inproc://#2")

for i in range(N_ITERS):
    pull.recv_pyobj()
    print(i, end=", ")

# wait for workers to exit
for t in threads:
    t.join()

1 ответ

Решение

Проблема в том, что ты client.get(...)возвращает объект запроса с активным дескриптором в сокет уровня ОС. Неспособность закрыть этот объект приводит к тому, что aiohttp исчерпывает сокеты, то есть достигает предела соединителя, который по умолчанию равен 100.

Чтобы решить проблему, вам нужно закрыть объект, возвращаемый client.get(), или используйте async with что гарантирует, что объект закроется, как только withблок сделан. Например:

async def get(client):
    async with client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",}) as resp:
        pass

async def main():
    async with aiohttp.ClientSession() as client:
        coros = [get(client) for _ in range(500)]
        for i, coro in enumerate(asyncio.as_completed(coros)):
            await coro
            print(i, end=", ", flush=True)

asyncio.run(main())

Кроме того, aiohttp.ClientSession объект также должен быть закрыт, что также можно сделать с помощью async with, как показано выше.

Другие вопросы по тегам