Асинхронные запросы с запросами Python

Я попробовал пример, представленный в документации библиотеки запросов для Python:

http://docs.python-requests.org/en/latest/user/advanced/

с async.map(rs) Я получаю коды ответов, но хочу получить содержимое каждой запрошенной страницы.

out = async.map(rs)
print out[0].content

например просто не работает.

16 ответов

Решение

Заметка

Приведенный ниже ответ не относится к запросам v0.13.0+. Асинхронная функциональность была перенесена в греквесты после того, как этот вопрос был написан. Тем не менее, вы можете просто заменить requests с grequests ниже и должно работать.

Я оставил этот ответ, чтобы отразить исходный вопрос, который был об использовании запросов


Для выполнения нескольких задач с async.map асинхронно вы должны:

  1. Определите функцию для того, что вы хотите сделать с каждым объектом (ваша задача)
  2. Добавьте эту функцию в качестве обработчика событий в вашем запросе
  3. Вызов async.map в списке всех запросов / действий

Пример:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
    'http://python-requests.org',
    'http://httpbin.org',
    'http://python-guide.org',
    'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # 
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async
    async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

async теперь независимый модуль: grequests,

Смотрите здесь: https://github.com/kennethreitz/grequests

И еще: идеальный метод для отправки нескольких HTTP-запросов через Python?

монтаж:

$ pip install grequests

использование:

построить стек:

import grequests

urls = [
    'http://www.heroku.com',
    'http://tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

rs = (grequests.get(u) for u in urls)

отправить стек

grequests.map(rs)

результат выглядит

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

Греквесты, по-видимому, не устанавливают ограничения для одновременных запросов, т.е. когда несколько запросов отправляются на один и тот же сервер.

Я проверил оба запроса-фьючерсы и грекеты. Grequests быстрее, но приносит исправления обезьян и дополнительные проблемы с зависимостями. Фьючерсы на запросы в несколько раз медленнее, чем грекеты. Я решил написать свои собственные и просто упакованные запросы в ThreadPollExecutor, и это было почти так же быстро, как и запросы, но без внешних зависимостей.

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
        else:
            resp_ok = resp_ok + 1

К сожалению, насколько мне известно, библиотека запросов не приспособлена для выполнения асинхронных запросов. Вы можете обернутьasync/await синтаксис вокруг requests, но это сделает базовые запросы не менее синхронными. Если вам нужны истинные асинхронные запросы, вы должны использовать другие инструменты, которые его предоставляют. Одно из таких решений - aiohttp(Python 3.5.3+). По моему опыту, он хорошо работает с Python 3.7.async/awaitсинтаксис. Ниже я напишу три реализации выполнения n веб-запросов с использованием

  1. Чисто синхронные запросы (sync_requests_get_all) с помощью Python requests библиотека
  2. Синхронные запросы (async_requests_get_all) с помощью Python requests библиотека, завернутая в Python 3.7async/await синтаксис и asyncio
  3. Поистине асинхронная реализация (async_aiohttp_get_all) с Python aiohttp библиотека, завернутая в Python 3.7async/await синтаксис и asyncio
import time
import asyncio
import requests
import aiohttp

from types import SimpleNamespace

durations = []


def timed(func):
    """
    records approximate durations of function calls
    """
    def wrapper(*args, **kwargs):
        start = time.time()
        print(f'{func.__name__:<30} started')
        result = func(*args, **kwargs)
        duration = f'{func.__name__:<30} finsished in {time.time() - start:.2f} seconds'
        print(duration)
        durations.append(duration)
        return result
    return wrapper


async def fetch(url, session):
    """
    asynchronous get request
    """
    async with session.get(url) as response:
        response_json = await response.json()
        return SimpleNamespace(**response_json)


async def fetch_many(loop, urls):
    """
    many asynchronous get requests, gathered
    """
    async with aiohttp.ClientSession() as session:
        tasks = [loop.create_task(fetch(url, session)) for url in urls]
        return await asyncio.gather(*tasks)


@timed
def asnyc_aiohttp_get_all(urls):
    """
    performs asynchronous get requests
    """
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(fetch_many(loop, urls))


@timed
def sync_requests_get_all(urls):
    """
    performs synchronous get requests
    """
    # use session to reduce network overhead
    session = requests.Session()
    return [SimpleNamespace(**session.get(url).json()) for url in urls]


@timed
def async_requests_get_all(urls):
    """
    asynchronous wrapper around synchronous requests
    """
    loop = asyncio.get_event_loop()
    # use session to reduce network overhead
    session = requests.Session()

    async def async_get(url):
        return session.get(url)

    async_tasks = [loop.create_task(async_get(url)) for url in urls]
    return loop.run_until_complete(asyncio.gather(*async_tasks))


if __name__ == '__main__':
    # this endpoint takes ~3 seconds to respond,
    # so a purely synchronous implementation should take
    # little more than 30 seconds and a purely asynchronous
    # implementation should take little more than 3 seconds.
    urls = ['https://postman-echo.com/delay/3']*10

    sync_requests_get_all(urls)
    async_requests_get_all(urls)
    asnyc_aiohttp_get_all(urls)
    print('----------------------')
    [print(duration) for duration in durations]

На моей машине это результат:

sync_requests_get_all          started
sync_requests_get_all          finsished in 30.92 seconds
async_requests_get_all         started
async_requests_get_all         finsished in 30.87 seconds
asnyc_aiohttp_get_all          started
asnyc_aiohttp_get_all          finsished in 3.22 seconds
----------------------
sync_requests_get_all          finsished in 30.92 seconds
async_requests_get_all         finsished in 30.87 seconds
asnyc_aiohttp_get_all          finsished in 3.22 seconds

Может быть, фьючерсы на запросы - это другой выбор.

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

Это также рекомендуется в офисном документе. Если вы не хотите вовлекать Gevent, это хорошо.

Ты можешь использовать httpx для этого.

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

если вам нужен функциональный синтаксис, библиотека gamla помещает его вget_async.

Тогда ты можешь сделать


await gamla.map(gamla.get_async(10), ["http://google.com", "http://wikipedia.org"])

В 10 время ожидания в секундах.

(отказ от ответственности: я являюсь его автором)

У меня много проблем с большинством опубликованных ответов - они либо используют устаревшие библиотеки, которые были перенесены с ограниченными функциями, либо предоставляют решение со слишком большим количеством магии при выполнении запроса, что затрудняет обработку ошибок. Если они не попадают в одну из вышеперечисленных категорий, они являются сторонними библиотеками или являются устаревшими.

Некоторые решения работают нормально только в HTTP-запросах, но решения не подходят для любых других запросов, что просто смехотворно. Здесь не требуется строго индивидуального решения.

Просто используя встроенную библиотеку python asyncio достаточно для выполнения асинхронных запросов любого типа, а также обеспечивает достаточную гибкость для обработки сложных и специфичных для конкретных случаев ошибок.

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)
        do_chores(response)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)
        do_chores(response)

    async_tasks = []
    for element in list(params.list_of_things):
       async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
       async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))

    loop.run_until_complete(asyncio.gather(*async_tasks))

Принцип работы прост. Вы создаете серию задач, которые хотите выполнять асинхронно, а затем запрашиваете цикл для выполнения этих задач и выхода по завершении. Никаких дополнительных библиотек из-за отсутствия поддержки или отсутствия функциональности не требуется.

Я знаю, что это было закрыто некоторое время, но я подумал, что было бы полезно продвинуть другое асинхронное решение, основанное на библиотеке запросов.

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

Документы находятся здесь: http://pythonhosted.org/simple-requests/

Если вы хотите использовать Asyncio, то requests-async обеспечивает асинхронную / ожидающую функциональность для requests - https://github.com/encode/requests-async

ОТКАЗ: Following code creates different threads for each function.

Это может быть полезно в некоторых случаях, поскольку его проще использовать. Но знайте, что это не асинхронный режим, но он дает иллюзию асинхронности с использованием нескольких потоков, даже если декоратор предлагает это.

Вы можете использовать следующий декоратор для выполнения обратного вызова после завершения выполнения функции, обратный вызов должен обрабатывать данные, возвращаемые функцией.

Обратите внимание, что после оформления функции она вернет Future объект.

      import asyncio

## Decorator implementation of async runner !!
def run_async(callback, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    def inner(func):
        def wrapper(*args, **kwargs):
            def __exec():
                out = func(*args, **kwargs)
                callback(out)
                return out

            return loop.run_in_executor(None, __exec)

        return wrapper

    return inner

Пример реализации:

      urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = []  # OPTIONAL, used for showing realtime, which urls are loaded !!


def _callback(resp):
    print(resp.url)
    print(resp)
    loaded_urls.append((resp.url, resp))  # OPTIONAL, used for showing realtime, which urls are loaded !!


# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
    return requests.get(url)


for url in urls:
    get(url)

Если вы хотите видеть, какие URL-адреса загружаются в режиме реального времени, вы также можете добавить следующий код в конце:

      while True:
    print(loaded_urls)
    if len(loaded_urls) == len(urls):
        break
threads=list()

for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

...

def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)
    o...

Я приведенное выше предложение использовать поддерживаюHTTPX , но я часто использую его по-другому, поэтому добавляю свой ответ.

Я лично использую asyncio.run( введено в Python 3.7), а не asyncio.gather а также предпочитаю aiostream подход, который можно использовать в сочетании с asyncio и httpx.

Как и в этом примере, который я только что опубликовал, этот стиль полезен для асинхронной обработки набора URL-адресов, даже несмотря на (обычное) возникновение ошибок. Мне особенно нравится, как этот стиль проясняет, где происходит обработка ответа, и для простоты обработки ошибок (что, как я считаю, асинхронные вызовы обычно дают больше).

Легче опубликовать простой пример асинхронного запуска кучи запросов, но часто вы также хотите обрабатывать содержимое ответа (вычислить что-нибудь с ним, возможно, со ссылкой на исходный объект, с которым был запрошен URL-адрес).

Суть этого подхода выглядит так:

      async with httpx.AsyncClient(timeout=timeout) as session:
    ws = stream.repeat(session)
    xs = stream.zip(ws, stream.iterate(urls))
    ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
    process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
    zs = stream.map(ys, process)
    return await zs

куда:

  • process_thing это функция обработки содержимого асинхронного ответа
  • это входной список (который urls генератор строк URL), например, список объектов / словарей
  • pbar это индикатор выполнения (например, tqdm.tqdm) [необязательно, но полезно]

Все это выполняется в асинхронной функции async_fetch_urlset который затем запускается путем вызова синхронной функции верхнего уровня с именем, например, fetch_things который запускает сопрограмму [это то, что возвращает асинхронная функция] и управляет циклом событий:

      def fetch_things(urls, things, pbar=None, verbose=False):
    return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))

Поскольку в качестве входных данных передан список (здесь things) можно изменить на месте, вы можете эффективно получить результат обратно (как мы привыкли к синхронным вызовам функций)

Я уже некоторое время использую запросы Python для асинхронных вызовов к gist API github.

Для примера посмотрите код здесь:

https://github.com/davidthewatson/flasgist/blob/master/views.py

Этот стиль Python может быть не самым ярким примером, но я могу заверить вас, что код работает. Дайте мне знать, если это вас смущает, и я запишу это.

Я также попробовал некоторые вещи, используя асинхронные методы в python, но мне повезло больше, если использовать twisted для асинхронного программирования. У него меньше проблем и он хорошо документирован. Вот ссылка на что-то похожее на то, что вы пытаетесь в витой.

http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html

Я бы очень рекомендовал для этого Hyper_requests (https://github.com/edjones84/hyper-requests), который позволяет генерировать список URL-адресов и параметров, а затем асинхронно запускать запросы:

      import hyper_requests

# Define the request parameters
params = [
    {'url': 'http://httpbin.org/get' , 'data': 'value1'},
    {'url': 'http://httpbin.org/get' , 'data': 'value3'},
    {'url': 'http://httpbin.org/get' , 'data': 'value5'},
    {'url': 'http://httpbin.org/get' , 'data': 'value7'},
    {'url': 'http://httpbin.org/get' , 'data': 'value9'}
]

# Create an instance of AsyncRequests and execute the requests
returned_data = hyper_requests.get(request_params=params, workers=10)

# Process the returned data
for response in returned_data:
    print(response)

Ни один из приведенных выше ответов не помог мне, потому что они предполагают, что у вас есть предопределенный список запросов, а в моем случае мне нужно иметь возможность прослушивать запросы и отвечать асинхронно (аналогично тому, как это работает в nodejs).

      def handle_finished_request(r, **kwargs):
    print(r)


# while True:
def main():
    while True:
        address = listen_to_new_msg()  # based on your server

        # schedule async requests and run 'handle_finished_request' on response
        req = grequests.get(address, timeout=1, hooks=dict(response=handle_finished_request))
        job = grequests.send(req)  # does not block! for more info see https://stackoverflow.com/a/16016635/10577976


main()

вhandle_finished_requestобратный вызов будет вызван при получении ответа. примечание: по какой-то причине тайм-аут (или отсутствие ответа) здесь не вызывает ошибку

Этот простой цикл может запускать асинхронные запросы аналогично тому, как это будет работать на сервере nodejs.

Другие вопросы по тегам