Python asyncio и httpx

Я новичок в асинхронном программировании и играл с httpx. У меня есть следующий код, и я уверен, что делаю что-то не так - просто не знаю, что это такое. Есть два метода: синхронный и асинхронный. Они оба взяты из финансов Google. В моей системе я вижу потраченное время следующим образом:

Асинхронный: 5.015218734741211
Синхронный: 5.173618316650391

Вот код:

      
import httpx
import asyncio
import time



#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
def sync_pull(url):
  r = httpx.get(url)
  print(r.status_code)


#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
async def async_pull(url):
  async with httpx.AsyncClient() as client:
    r = await client.get(url)
    print(r.status_code)


#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
if __name__ == "__main__":

  goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
  tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL', 
             'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
             ]  

  print("Running asynchronously...")
  async_start = time.time()
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    asyncio.run(async_pull(url))
  async_end = time.time()
  print(f"Time lapsed is: {async_end - async_start}")


  print("Running synchronously...")
  sync_start = time.time()
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    sync_pull(url)
  sync_end = time.time()
  print(f"Time lapsed is: {sync_end - sync_start}")

Я надеялся, что асинхронный метод потребует меньше времени, чем синхронный подход. Что я делаю неправильно?

3 ответа

Когда ты сказал asyncio.run(async_pull)вы говорите запустить async_pull и дождаться возврата результата. Поскольку вы делаете это один раз для каждого тикера в своем цикле, вы, по сути, используете asyncio для синхронного запуска и не увидите выигрыша в производительности.

Что вам нужно сделать, так это создать несколько асинхронных вызовов и запускать их одновременно. Есть несколько способов сделать это, самый простой - использовать (см. Https://docs.python.org/3/library/asyncio-task.html#asyncio.gather ), который принимает последовательность сопрограмм и запускает их одновременно. . Адаптировать код довольно просто: вы создаете асинхронную функцию для получения списка URL-адресов, а затем вызываете async_pull по каждому из них, а затем передать это asyncio.gatherи жду результатов. Адаптация вашего кода к этому выглядит следующим образом:

      import httpx
import asyncio
import time

def sync_pull(url):
    r = httpx.get(url)
    print(r.status_code)

async def async_pull(url):
    async with httpx.AsyncClient() as client:
        r = await client.get(url)
        print(r.status_code)


async def async_pull_all(urls):
    return await asyncio.gather(*[async_pull(url) for url in urls])

if __name__ == "__main__":

    goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
    tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
           'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
           ]

    print("Running asynchronously...")
    async_start = time.time()
    results = asyncio.run(async_pull_all([goog_fin_nyse_url + ticker + ':NYSE' for ticker in tickers]))
    async_end = time.time()
    print(f"Time lapsed is: {async_end - async_start}")


    print("Running synchronously...")
    sync_start = time.time()
    for ticker in tickers:
        url = goog_fin_nyse_url + ticker + ':NYSE'
        sync_pull(url)
    sync_end = time.time()
    print(f"Time lapsed is: {sync_end - sync_start}")

Таким образом, асинхронная версия выполняется для меня примерно за секунду, в отличие от семи синхронных.

Вот хороший шаблон, который я использую (я стараюсь каждый раз немного его менять). В общем делаю модуль async_utils.py и просто импортируйте функцию выборки верхнего уровня (например, здесь fetch_things), и тогда мой код может забыть о внутреннем устройстве (кроме обработки ошибок). Вы можете сделать это другими способами, но мне нравится «функциональный» стиль aiostream, и я часто обнаруживаю, что повторяющиеся вызовы функции процесса принимают определенные значения по умолчанию, которые я установил с помощьюfunctools.partial.

Вы можете пройти через tqdm.tqdm индикатор выполнения до pbar (инициализируется известным размером total=len(things)), чтобы он обновлялся при обработке каждого асинхронного ответа.

      import asyncio
import httpx
from aiostream import stream
from functools import partial

__all__ = ["fetch", "process", "async_fetch_urlset", "fetch_things"]

async def fetch(session, url, raise_for_status=False):
    response = await session.get(str(url))
    if raise_for_status:
        response.raise_for_status()
    return response


async def process_thing(data, things, pbar=None, verbose=False):
    # Map the response back to the thing it came from in the things list
    source_url = data.history[0].url if data.history else data.url
    thing = next(t for t in things if source_url == t.get("thing_url"))
    # Handle `data.content` here, where `data` is the `httpx.Response`
    if verbose:
        print(f"Processing {source_url=}")
    build.update({"computed_value": "result goes here"})
    if pbar:
        pbar.update()


async def async_fetch_urlset(urls, things, pbar=None, verbose=False, timeout_s=10.0):
    timeout = httpx.Timeout(timeout=timeout_s)
    async with httpx.AsyncClient(timeout=timeout) as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
        process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
        zs = stream.map(ys, process)
        return await zs

def fetch_things(urls, things, pbar=None, verbose=False):
    return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))

В этом примере входные данные - это список словарей (со строковыми ключами и значениями), things: list[dict[str,str]], а ключ "thing_url"обращается для получения URL. Желательно иметь dict или объект, а не просто строку URL-адреса, когда вы хотите «сопоставить» результат с объектом, из которого он пришел. В process_thing функция может изменять список ввода things на месте (т.е. любые изменения не входят в область видимости функции, они изменяют ее обратно в области видимости, которая ее вызвала).

Вы часто будете обнаруживать ошибки, возникающие во время асинхронных запусков, которые вы не получаете при синхронном запуске, поэтому вам нужно будет их отловить и повторить попытку. Распространенная проблема - это попытка повторить попытку на неправильном уровне (например, по всему циклу)

В частности, вы захотите импортировать и поймать httpcore.ConnectTimeout, httpx.ConnectTimeout, httpx.RemoteProtocolError, и httpx.ReadTimeout.

Увеличение timeout_s параметр уменьшит частоту ошибок тайм-аута, позволяя AsyncClient «ждать» дольше, но это может фактически замедлить вашу программу (она не будет так быстро «терпеть неудачу»).

Вот пример того, как использовать async_utils модуль, указанный выше:

      from async_utils import fetch_things
import httpx
import httpcore

# UNCOMMENT THIS TO SEE ALL THE HTTPX INTERNAL LOGGING
#import logging
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
#console = logging.StreamHandler()
#console.setLevel(logging.DEBUG)
#console.setFormatter(log_format)
#log.addHandler(console)

things = [
    {"url": "https://python.org", "name": "Python"},
    {"url": "https://www.python-httpx.org/", "name": "HTTPX"},
]
#log.debug("URLSET:" + str(list(t.get("url") for t in things)))

def make_urlset(things):
    """Make a URL generator (empty if all have been fetched)"""
    urlset = (t.get("url") for t in things if "computed_value" not in t)
    return urlset

retryable_errors = (
    httpcore.ConnectTimeout,
    httpx.ConnectTimeout, httpx.RemoteProtocolError, httpx.ReadTimeout,
)

# ASYNCHRONOUS:
max_retries = 100
for i in range(max_retries):
    print(f"Retry {i}")
    try:
        urlset = make_urlset(things)
        foo = fetch_things(urls=urlset, things=things, verbose=True)
    except retryable_errors as exc:
        print(f"Caught {exc!r}")
        if i == max_retries - 1:
            raise
    except Exception:
        raise

# SYNCHRONOUS:
#for t in things:
#    resp = httpx.get(t["url"])

В этом примере я установил ключ "computed_value" в словаре после успешной обработки асинхронного ответа, который затем предотвращает ввод этого URL-адреса в генератор в следующем раунде (когда make_urlsetвызывается снова). Таким образом, генератор становится все меньше и меньше. Вы также можете сделать это со списками, но я считаю, что генератор извлекаемых URL-адресов работает надежно. Для объекта вы должны изменить назначение клавиш словаря / доступ ( update/ in) для присвоения атрибута / доступа ( settatr/ hasattr).

Я хотел опубликовать рабочую версию кодирования с использованием фьючерсов - практически то же время выполнения:

      
import httpx
import asyncio
import time

#
#--------------------------------------------------------------------
# Synchronous pull
#--------------------------------------------------------------------
#
def sync_pull(url):
  r = httpx.get(url)
  print(r.status_code)

#
#--------------------------------------------------------------------
# Asynchronous Pull
#--------------------------------------------------------------------
#
async def async_pull(url):
  async with httpx.AsyncClient() as client:
    r = await client.get(url)
    print(r.status_code)

#
#--------------------------------------------------------------------
# Build tasks queue & execute coroutines
#--------------------------------------------------------------------
#
async def build_task() -> None:
  goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
  tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL', 
             'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
             ]  
  tasks= []

  #
  ## Following block of code will create a queue full of function 
  ## call
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    tasks.append(asyncio.ensure_future(async_pull(url)))

  start_time = time.time()

  #
  ## This block of code will derefernce the function calls
  ## from the queue, which will cause them all to run
  ## rapidly
  await asyncio.gather(*tasks)

  #
  ## Calculate time lapsed
  finish_time = time.time()
  elapsed_time = finish_time - start_time
  print(f"\n Time spent processing: {elapsed_time} ")


# Start from here
if __name__ == "__main__":
  asyncio.run(build_task())

Другие вопросы по тегам