Python asyncio и httpx
Я новичок в асинхронном программировании и играл с httpx. У меня есть следующий код, и я уверен, что делаю что-то не так - просто не знаю, что это такое. Есть два метода: синхронный и асинхронный. Они оба взяты из финансов Google. В моей системе я вижу потраченное время следующим образом:
Асинхронный: 5.015218734741211
Синхронный: 5.173618316650391
Вот код:
import httpx
import asyncio
import time
#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
def sync_pull(url):
r = httpx.get(url)
print(r.status_code)
#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
async def async_pull(url):
async with httpx.AsyncClient() as client:
r = await client.get(url)
print(r.status_code)
#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
if __name__ == "__main__":
goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
]
print("Running asynchronously...")
async_start = time.time()
for ticker in tickers:
url = goog_fin_nyse_url + ticker + ':NYSE'
asyncio.run(async_pull(url))
async_end = time.time()
print(f"Time lapsed is: {async_end - async_start}")
print("Running synchronously...")
sync_start = time.time()
for ticker in tickers:
url = goog_fin_nyse_url + ticker + ':NYSE'
sync_pull(url)
sync_end = time.time()
print(f"Time lapsed is: {sync_end - sync_start}")
Я надеялся, что асинхронный метод потребует меньше времени, чем синхронный подход. Что я делаю неправильно?
3 ответа
Когда ты сказал
asyncio.run(async_pull)
вы говорите запустить async_pull и дождаться возврата результата. Поскольку вы делаете это один раз для каждого тикера в своем цикле, вы, по сути, используете asyncio для синхронного запуска и не увидите выигрыша в производительности.
Что вам нужно сделать, так это создать несколько асинхронных вызовов и запускать их одновременно. Есть несколько способов сделать это, самый простой - использовать (см. Https://docs.python.org/3/library/asyncio-task.html#asyncio.gather ), который принимает последовательность сопрограмм и запускает их одновременно. . Адаптировать код довольно просто: вы создаете асинхронную функцию для получения списка URL-адресов, а затем вызываете
async_pull
по каждому из них, а затем передать это
asyncio.gather
и жду результатов. Адаптация вашего кода к этому выглядит следующим образом:
import httpx
import asyncio
import time
def sync_pull(url):
r = httpx.get(url)
print(r.status_code)
async def async_pull(url):
async with httpx.AsyncClient() as client:
r = await client.get(url)
print(r.status_code)
async def async_pull_all(urls):
return await asyncio.gather(*[async_pull(url) for url in urls])
if __name__ == "__main__":
goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
]
print("Running asynchronously...")
async_start = time.time()
results = asyncio.run(async_pull_all([goog_fin_nyse_url + ticker + ':NYSE' for ticker in tickers]))
async_end = time.time()
print(f"Time lapsed is: {async_end - async_start}")
print("Running synchronously...")
sync_start = time.time()
for ticker in tickers:
url = goog_fin_nyse_url + ticker + ':NYSE'
sync_pull(url)
sync_end = time.time()
print(f"Time lapsed is: {sync_end - sync_start}")
Таким образом, асинхронная версия выполняется для меня примерно за секунду, в отличие от семи синхронных.
Вот хороший шаблон, который я использую (я стараюсь каждый раз немного его менять). В общем делаю модуль
async_utils.py
и просто импортируйте функцию выборки верхнего уровня (например, здесь
fetch_things
), и тогда мой код может забыть о внутреннем устройстве (кроме обработки ошибок). Вы можете сделать это другими способами, но мне нравится «функциональный» стиль aiostream, и я часто обнаруживаю, что повторяющиеся вызовы функции процесса принимают определенные значения по умолчанию, которые я установил с помощьюfunctools.partial
.
Вы можете пройти через
tqdm.tqdm
индикатор выполнения до
pbar
(инициализируется известным размером
total=len(things)
), чтобы он обновлялся при обработке каждого асинхронного ответа.
import asyncio
import httpx
from aiostream import stream
from functools import partial
__all__ = ["fetch", "process", "async_fetch_urlset", "fetch_things"]
async def fetch(session, url, raise_for_status=False):
response = await session.get(str(url))
if raise_for_status:
response.raise_for_status()
return response
async def process_thing(data, things, pbar=None, verbose=False):
# Map the response back to the thing it came from in the things list
source_url = data.history[0].url if data.history else data.url
thing = next(t for t in things if source_url == t.get("thing_url"))
# Handle `data.content` here, where `data` is the `httpx.Response`
if verbose:
print(f"Processing {source_url=}")
build.update({"computed_value": "result goes here"})
if pbar:
pbar.update()
async def async_fetch_urlset(urls, things, pbar=None, verbose=False, timeout_s=10.0):
timeout = httpx.Timeout(timeout=timeout_s)
async with httpx.AsyncClient(timeout=timeout) as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
zs = stream.map(ys, process)
return await zs
def fetch_things(urls, things, pbar=None, verbose=False):
return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))
В этом примере входные данные - это список словарей (со строковыми ключами и значениями),
things: list[dict[str,str]]
, а ключ
"thing_url"
обращается для получения URL. Желательно иметь dict или объект, а не просто строку URL-адреса, когда вы хотите «сопоставить» результат с объектом, из которого он пришел. В
process_thing
функция может изменять список ввода
things
на месте (т.е. любые изменения не входят в область видимости функции, они изменяют ее обратно в области видимости, которая ее вызвала).
Вы часто будете обнаруживать ошибки, возникающие во время асинхронных запусков, которые вы не получаете при синхронном запуске, поэтому вам нужно будет их отловить и повторить попытку. Распространенная проблема - это попытка повторить попытку на неправильном уровне (например, по всему циклу)
В частности, вы захотите импортировать и поймать
httpcore.ConnectTimeout
,
httpx.ConnectTimeout
,
httpx.RemoteProtocolError
, и
httpx.ReadTimeout
.
Увеличение
timeout_s
параметр уменьшит частоту ошибок тайм-аута, позволяя AsyncClient «ждать» дольше, но это может фактически замедлить вашу программу (она не будет так быстро «терпеть неудачу»).
Вот пример того, как использовать
async_utils
модуль, указанный выше:
from async_utils import fetch_things
import httpx
import httpcore
# UNCOMMENT THIS TO SEE ALL THE HTTPX INTERNAL LOGGING
#import logging
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
#console = logging.StreamHandler()
#console.setLevel(logging.DEBUG)
#console.setFormatter(log_format)
#log.addHandler(console)
things = [
{"url": "https://python.org", "name": "Python"},
{"url": "https://www.python-httpx.org/", "name": "HTTPX"},
]
#log.debug("URLSET:" + str(list(t.get("url") for t in things)))
def make_urlset(things):
"""Make a URL generator (empty if all have been fetched)"""
urlset = (t.get("url") for t in things if "computed_value" not in t)
return urlset
retryable_errors = (
httpcore.ConnectTimeout,
httpx.ConnectTimeout, httpx.RemoteProtocolError, httpx.ReadTimeout,
)
# ASYNCHRONOUS:
max_retries = 100
for i in range(max_retries):
print(f"Retry {i}")
try:
urlset = make_urlset(things)
foo = fetch_things(urls=urlset, things=things, verbose=True)
except retryable_errors as exc:
print(f"Caught {exc!r}")
if i == max_retries - 1:
raise
except Exception:
raise
# SYNCHRONOUS:
#for t in things:
# resp = httpx.get(t["url"])
В этом примере я установил ключ
"computed_value"
в словаре после успешной обработки асинхронного ответа, который затем предотвращает ввод этого URL-адреса в генератор в следующем раунде (когда
make_urlset
вызывается снова). Таким образом, генератор становится все меньше и меньше. Вы также можете сделать это со списками, но я считаю, что генератор извлекаемых URL-адресов работает надежно. Для объекта вы должны изменить назначение клавиш словаря / доступ (
update
/
in
) для присвоения атрибута / доступа (
settatr
/
hasattr
).
Я хотел опубликовать рабочую версию кодирования с использованием фьючерсов - практически то же время выполнения:
import httpx
import asyncio
import time
#
#--------------------------------------------------------------------
# Synchronous pull
#--------------------------------------------------------------------
#
def sync_pull(url):
r = httpx.get(url)
print(r.status_code)
#
#--------------------------------------------------------------------
# Asynchronous Pull
#--------------------------------------------------------------------
#
async def async_pull(url):
async with httpx.AsyncClient() as client:
r = await client.get(url)
print(r.status_code)
#
#--------------------------------------------------------------------
# Build tasks queue & execute coroutines
#--------------------------------------------------------------------
#
async def build_task() -> None:
goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
]
tasks= []
#
## Following block of code will create a queue full of function
## call
for ticker in tickers:
url = goog_fin_nyse_url + ticker + ':NYSE'
tasks.append(asyncio.ensure_future(async_pull(url)))
start_time = time.time()
#
## This block of code will derefernce the function calls
## from the queue, which will cause them all to run
## rapidly
await asyncio.gather(*tasks)
#
## Calculate time lapsed
finish_time = time.time()
elapsed_time = finish_time - start_time
print(f"\n Time spent processing: {elapsed_time} ")
# Start from here
if __name__ == "__main__":
asyncio.run(build_task())