Как собрать результаты задачи в Трио?
Я написал скрипт, который использует детскую и модуль asks для циклического прохождения и вызова API на основе переменных цикла. Я получаю ответы, но не знаю, как вернуть данные, как вы бы с asyncio.
У меня также есть вопрос об ограничении API до 5 в секунду.
from datetime import datetime
import asks
import time
import trio
asks.init("trio")
s = asks.Session(connections=4)
async def main():
start_time = time.time()
api_key = 'API-KEY'
org_id = 'ORG-ID'
networkIds = ['id1','id2','idn']
url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers)
print("Total time:", time.time() - start_time)
async def fetch(url, headers):
print("Start: ", url)
response = await s.get(url, headers=headers)
print("Finished: ", url, len(response.content), response.status_code)
if __name__ == "__main__":
trio.run(main)
Когда я запускаю nursery.start_soon(fetch...), я печатаю данные в fetch, но как мне вернуть данные? Я не видел ничего похожего на функцию asyncio.gather(*tasks).
Кроме того, я могу ограничить количество сеансов до 1-4, что помогает снизить ограничение до 5 API в секунду, но мне было интересно, существует ли встроенный способ, обеспечивающий вызов не более 5 API в любой данной секунде?
5 ответов
Возвращение данных: передайте сетевой идентификатор и диктовку fetch
задачи:
async def main():
…
results = {}
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers, results, i)
## results are available here
async def fetch(url, headers, results, i):
print("Start: ", url)
response = await s.get(url, headers=headers)
print("Finished: ", url, len(response.content), response.status_code)
results[i] = response
Альтернативно, создайте trio.Queue
к которому ты put
результаты, достижения; Ваша основная задача может затем прочитать результаты из очереди.
Ограничение API: создать trio.Queue(10)
и запустите задание по следующим направлениям:
async def limiter(queue):
while True:
await trio.sleep(0.2)
await queue.put(None)
Передайте эту очередь fetch
в качестве еще одного аргумента и вызова await limit_queue.get()
перед каждым вызовом API.
Технически, trio.Queue
устарел в трио 0,9. Он был заменен trio.open_memory_channel
,
Краткий пример:
sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, sender, url.format(i), headers)
async for value in receiver:
# Do your job here
pass
И в твоем fetch
функция, которую вы должны вызвать async sender.send(value)
где-то.
На основе этих ответов вы можете определить следующую функцию:
async def gather(*tasks):
async def collect(index, task, results):
task_func, *task_args = task
results[index] = await task_func(*task_args)
results = {}
async with trio.open_nursery() as nursery:
for index, task in enumerate(tasks):
nursery.start_soon(collect, index, task, results)
return [results[i] for i in range(len(tasks))]
Затем вы можете использовать трио точно так же, как asyncio, просто исправив трио (добавив функцию сбора):
import trio
trio.gather = gather
Вот практический пример:
async def child(x):
print(f"Child sleeping {x}")
await trio.sleep(x)
return 2*x
async def parent():
tasks = [(child, t) for t in range(3)]
return await trio.gather(*tasks)
print("results:", trio.run(parent))
Когда я запускаю nursery.start_soon(fetch...), я печатаю данные в fetch, но как мне вернуть данные? Я не видел ничего похожего на функцию asyncio.gather(*tasks).
Вы задаете два разных вопроса, поэтому я просто отвечу на этот. Матиас уже ответил на ваш другой вопрос.
Когда вы звоните start_soon()
, вы просите Trio запустить задачу в фоновом режиме, а затем продолжать работу. Вот почему Trio умеет бегать fetch()
несколько раз одновременно. Но поскольку Trio продолжает работать, нет способа "вернуть" результат, как это обычно делает функция Python. куда бы он вообще вернулся?
Вы можете использовать очередь, чтобы fetch()
задачи отправляют результаты в другую задачу для дополнительной обработки.
Чтобы создать очередь:
response_queue = trio.Queue()
Когда вы запускаете задачи извлечения, передайте очередь в качестве аргумента и отправьте sentintel в очередь, когда вы закончите:
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers)
await response_queue.put(None)
После того, как вы загрузите URL, поместите ответ в очередь:
async def fetch(url, headers, response_queue):
print("Start: ", url)
response = await s.get(url, headers=headers)
# Add responses to queue
await response_queue.put(response)
print("Finished: ", url, len(response.content), response.status_code)
С учетом вышеуказанных изменений ваши задачи извлечения будут помещать ответы в очередь. Теперь вам нужно прочитать ответы из очереди, чтобы вы могли обработать их. Вы можете добавить новую функцию для этого:
async def process(response_queue):
async for response in response_queue:
if response is None:
break
# Do whatever processing you want here.
Вы должны запустить эту функцию процесса как фоновую задачу, прежде чем запускать какие-либо задачи извлечения, чтобы она обрабатывала ответы сразу после их получения.
Подробнее читайте в разделе " Синхронизация и связь между задачами " документации Trio.
Как говорится в ответе @Adrien Clerc:trio.Queue
устарело: https://trio.readthedocs.io/en/stable/history.html?highlight=trio.Queue#id40
Для связи задач в Trio см.: https://trio.readthedocs.io/en/latest/reference-core.html#using-channels-to-pass-values-between-tasks .
Вот полный рабочий минимальный пример (удаление запроса на получение асинхронного URL-адреса и замена на сон) для вашего варианта использования с использованиемopen_memory_channel
import datetime
import trio
async def main():
network_ids = ["id1", "id2", "idn"]
url = "https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600"
send_channel, receive_channel = trio.open_memory_channel(len(network_ids))
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, send_channel, url, network_ids)
nursery.start_soon(consumer, receive_channel)
async def producer(send_channel, url, network_ids):
async with send_channel:
async with trio.open_nursery() as nursery:
for i in network_ids:
nursery.start_soon(fetch, send_channel, url.format(i))
async def consumer(receive_channel):
async with receive_channel:
async for value in receive_channel:
# Do your job here
print(f"value received: {value} at time {datetime.datetime.utcnow()}")
async def fetch(send_channel, url):
print(f"Start: {datetime.datetime.utcnow()}")
await trio.sleep(1)
response = f"response for {url}"
await send_channel.send(response)
print(f"Finished: {datetime.datetime.utcnow()}")
if __name__ == "__main__":
trio.run(main)
Это печатает:
Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id1/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id2/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/idn/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040