Простейший пример асинхронности / ожидания в Python

Я прочитал много примеров, постов в блоге, вопросов / ответов о asyncio / async / await в Python 3.5+ многие были сложными, самый простой, который я нашел, был, вероятно, этот. Все еще использует ensure_future и в целях изучения асинхронного программирования на Python я хотел бы посмотреть, возможен ли еще более минимальный пример (т. е. какие минимальные инструменты необходимы для выполнения базового примера асинхронного ожидания / ожидания).

Вопрос: для изучения асинхронного программирования на Python можно привести простой пример, показывающий, как async / await работает, используя только эти два ключевых слова + asyncio.get_event_loop() + run_until_complete + другой код Python, но не другой asyncio функции?

Пример: что-то вроде этого:

import asyncio

async def async_foo():
    print("async_foo started")
    await asyncio.sleep(5)
    print("async_foo done")

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()
    print('Do some actions 1')
    await asyncio.sleep(5)
    print('Do some actions 2')

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

но без ensure_future и все еще демонстрирует, как работает await / async.

10 ответов

Чтобы ответить на ваши вопросы, я предоставлю 3 разных решения одной и той же проблемы.

случай 1: просто нормальный питон

import time

def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        sleep()
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()
tasks = [
    sum("A", [1, 2]),
    sum("B", [1, 2, 3]),
]
end = time.time()
print(f'Time: {end-start:.2f} sec')

выход:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6

Time: 5.02 sec

случай 2: асинхронное ожидание выполнено неправильно

import asyncio
import time

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

async def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await sleep()
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

выход:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6

Time: 5.01 sec

случай 3: асинхронное / ожидание выполнено правильно (так же, как в случае 2, за исключением sleep функция)

import asyncio
import time

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    await asyncio.sleep(1)

async def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await sleep()
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

выход:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 3+3
Time: 2.00
Task B: Sum = 6

Time: 3.01 sec

case 1 с case 2 дать то же самое 5 seconds, в то время как case 3 просто 3 seconds, Итак async/await done right быстрее.

Причина различий заключается в реализации sleep функция.

# case 1
def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

# case 2
async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

# case 3
async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    await asyncio.sleep(1)

sleep функция в case 1 а также case 2 подобные". Они "спят", не позволяя другим использовать ресурсы. В то время как case 3 разрешает доступ к ресурсам, когда он спит.

В case 2 мы добавили async к нормальной функции. Однако цикл обработки событий запустит его без прерывания. Зачем? Потому что мы не сказали, где циклу разрешено прерывать вашу функцию для запуска другой задачи.

В case 3 мы указали в цикле обработки событий, где именно нужно прервать функцию для запуска другой задачи. Где именно?

# case 3
async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    await asyncio.sleep(1) # <-- Right here!

Подробнее об этом читайте здесь

Можно ли привести простой пример, показывающий, как async / await работает, используя только эти два ключевых слова + asyncio.get_event_loop() + run_until_complete + другой код Python, но не другой asyncio функции?

Таким образом, можно написать код, который работает:

import asyncio


async def main():
    print('done!')


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Но таким образом невозможно продемонстрировать, почему вам нужно asyncio.

Кстати, зачем тебе asyncio, а не просто код? Ответ - asyncio позволяет получить выигрыш в производительности, когда вы распараллеливаете операции блокировки ввода / вывода (например, чтение / запись в сеть). И чтобы написать полезный пример, вам нужно использовать асинхронную реализацию этих операций.

Пожалуйста, прочитайте этот ответ для более подробного объяснения.

Upd:

хорошо, вот пример, который использует asyncio.sleep имитировать операцию блокировки ввода-вывода и asyncio.gather это показывает, как вы можете запустить несколько операций блокировки одновременно:

import asyncio


async def io_related(name):
    print(f'{name} started')
    await asyncio.sleep(1)
    print(f'{name} finished')


async def main():
    await asyncio.gather(
        io_related('first'),
        io_related('second'),
    )  # 1s + 1s = over 1s


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Выход:

first started
second started
first finished
second finished
[Finished in 1.2s]

Обратите внимание, как оба io_related началось потом, спустя всего одну секунду, оба сделали.

Python 3.7+ теперь имеет более простой API (на мой взгляд) с более простой формулировкой (легче запомнить, чем "sure_future"): вы можете использоватьcreate_task который возвращает объект Task (который может быть полезен позже для отмены задачи при необходимости).

Базовый пример 1

import asyncio

async def hello(i):
    print(f"hello {i} started")
    await asyncio.sleep(4)
    print(f"hello {i} done")

async def main():
    task1 = asyncio.create_task(hello(1))  # returns immediately, the task is created
    await asyncio.sleep(3)
    task2 = asyncio.create_task(hello(2))
    await task1
    await task2

asyncio.run(main())  # main loop

Результат:

привет 1 начал
привет 2 начал
привет 1 готово
привет 2 готово


Базовый пример 2

Если вам нужно получить возвращаемое значение этих асинхронных функций, тогдаgatherПолезно. Следующий пример основан на документации, но, к сожалению, в документе не показано, чтоgather действительно полезно для: получения возвращаемых значений!

import asyncio

async def factorial(n):
    f = 1
    for i in range(2, n + 1):
        print(f"Computing factorial({n}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    return f

async def main():
    L = await asyncio.gather(factorial(2), factorial(3), factorial(4))
    print(L)  # [2, 6, 24]

asyncio.run(main())

Ожидаемый результат:

Вычисление факториала (2), в настоящее время i=2...
Вычисление факториала (3), в настоящее время i=2...
Вычисление факториала (4), в настоящее время i=2...
Вычисление факториала (3), в настоящее время i=3...
Вычисление факториала (4), в настоящее время i=3...
Вычисление факториала (4), в настоящее время i = 4...
[2, 6, 24]


PS: даже если вы используете asyncio, и нет trio, учебник последнего был полезен для меня при изучении асинхронного программирования Python.

Поскольку все хорошо объяснено, давайте запустим несколько примеров с циклами событий, сравним синхронный код с асинхронным.

синхронный код:

import time

def count():
    time.sleep(1)
    print('1')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('3')

def main():
    for i in range(3):
        count()

if __name__ == "__main__":
    t = time.perf_counter()
    main()
    t2 = time.perf_counter()
    
    print(f'Total time elapsed: {t2:0.2f} seconds')

вывод:

1
2
3
1
2
3
1
2
3
Total time elapsed: 9.00 seconds

Мы видим, что каждый цикл счета завершается до начала следующего цикла.

асинхронный код:

import asyncio
import time

async def count():
    await asyncio.sleep(1)
    print('1')
    await asyncio.sleep(1)
    print('2')
    await asyncio.sleep(1)
    print('3')

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    t = time.perf_counter()
    asyncio.run(main())
    t2 = time.perf_counter()

    print(f'Total time elapsed: {t2:0.2f} seconds')

вывод:

1
1
1
2
2
2
3
3
3
Total time elapsed: 3.00 seconds

Асинхронный эквивалент, с другой стороны, выглядит сомнительно, как будто для этого потребовалось три секунды, а не девять секунд. Первый цикл счета был запущен, и как только он достиг awaits sleep one Python был свободен для выполнения другой работы, например, запуска второго и последующего третьего циклов счета. Вот почему у нас есть все то, то все трубки, то все три. Одновременное программирование вывода может быть очень ценным инструментом. Многопроцессорность предполагает выполнение всей многозадачной работы, и в Python это единственный вариант многоядерного параллелизма, при котором ваша программа выполняется на нескольких ядрах ЦП. При использовании потоков операционная система по-прежнему выполняет всю многозадачную работу, а в cpython глобальная блокировка intrepeter предотвращает многоядерный параллелизм при асинхронном программировании. Операционная система не вмешивается, есть один процесс, есть один поток, поэтому то, что происходит, задачи могут освободить ЦП, когда есть периоды ожидания, чтобы другая задача могла его использовать.

import asyncio

loop = asyncio.get_event_loop()


async def greeter(name):
    print(f"Hi, {name} you're in a coroutine.")

try:
    print('starting coroutine')
    coro = greeter('LP')
    print('entering event loop')
    loop.run_until_complete(coro)
finally:
    print('closing event loop')
    loop.close()

вывод:

starting coroutine
entering event loop
Hi, LP you're in a coroutine.
closing event loop

Асинхронным фреймворкам нужен планировщик, обычно называемый циклом событий. Этот цикл событий отслеживает все запущенные задачи, и когда функция приостановлена, он возвращает управление циклу событий, который затем находит другую функцию для запуска или возобновления, и это называется совместной многозадачностью. Асинхронный ввод-вывод обеспечивает структуру асинхронной структуры, которая сосредоточена на этом цикле событий, и она эффективно обрабатывает события ввода / вывода, которые приложение взаимодействует с циклом событий, явно регистрирует код для запуска, а затем позволяет циклу событий планировщику выполнять необходимые вызовы. код приложения, когда ресурсы доступны. Итак, если сетевой сервер открывает сокеты, а затем регистрирует их, чтобы им было сообщено, когда на них происходят события ввода, цикл событий будет предупреждать код сервера, когда есть новое входящее соединение или когда есть 'данные для чтения. Если из сокета не нужно читать больше данных, чем сервер, то управление возвращается в цикл обработки событий.

Механизм передачи управления циклу обработки событий зависит от совместных подпрограмм. Совместные подпрограммы - это языковая конструкция, разработанная для параллельной работы. Совместная подпрограмма может приостановить выполнение с помощью ключевого слова awake с другой сопрограммой, и пока она приостановлена, состояние совместной подпрограммы поддерживается, позволяя ему возобновить работу с того места, на котором она остановилась, одна сопрограмма может запустить другую, а затем ждать результатов, и это упрощает разложение задачи на многоразовые части.

import asyncio

loop = asyncio.get_event_loop()

async def outer():
    print('in outer')
    print('waiting for result 1')
    result1 = await phase1()
    print('waiting for result 2')
    result2 = await phase2(result1)
    return result1, result2


async def phase1():
    print('in phase1')
    return 'phase1 result'

async def phase2(arg):
    print('in phase2')
    return 'result2 derived from {}'.format(arg)

asyncio.run(outer())

вывод:

in outer
waiting for result 1
in phase1
waiting for result 2
in phase2

В этом примере запрашиваются две фазы, которые должны выполняться по порядку, но которые могут выполняться одновременно с другими операциями. В awakeключевое слово используется вместо добавления новых сопрограмм в цикл, потому что поток управления уже находится внутри сопрограммы, управляемой циклом. Нет необходимости указывать циклу управлять новыми совместными подпрограммами.

Я не знаю почему, но все объяснения по этой теме слишком сложны или они используют примеры с бесполезным asyncio.sleep () ... Пока что лучший образец кода, который я нашел, это: https://codeflex.co / python3-async-ожидание-пример /

Просто.. Мило.. Потрясающе.. ✅

        import asyncio
  import time
  import random

  async def eat():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Eating")

  async def sleep():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Sleeping")

  async def repeat():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Repeating")

  async def main():
     for x in range(5):
        await asyncio.gather(eat(),sleep(),repeat())
        time.sleep(2)
        print("+","-"*20)

  if __name__ == "__main__":
     t = time.perf_counter()
     asyncio.run(main())
     t2 = time.perf_counter()

     print(f'Total time elapsed: {t2:0.2f} seconds')

Кажется, все сосредоточены на том, чтобы переключиться на asyncio.sleep, но в реальном мире это невозможно. Иногда вам нужно выполнить вызов библиотеки, который, возможно, вызывает вызов API (например: запрос подписанного URL-адреса из Google).

Вот как вы все еще можете использовать time.sleep, но асинхронно:

      import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

async def sum(name, numbers):
    _executor = ThreadPoolExecutor(2)
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await loop.run_in_executor(_executor, sleep)
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

Выход:

      Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6

Time: 3.01 sec
      import asyncio
import requests

async def fetch_users():
    response = requests.get('https://www.testjsonapi.com/users/')
    users = response.json()
    return users

async def print_users():
    # create an asynchronous task to run concurrently 
    # which wont block executing print statement before it finishes
    response = asyncio.create_task(fetch_users())
    print("Fetching users ")
    # wait to get users data from response before printing users
    users = await response

    for user in users:
        print(f"name : {user['name']} email : {user['email']}")

asyncio.run(print_users())
print("All users printed in console")

вывод будет выглядеть так

      Fetching users
name : Harjas Malhotra email : harjas@gmail.com
name : Alisha Paul email : alisha@gmail.com
name : Mart Right email : marrk9658@yahoo.com
name : Brad Pitter email : brad@gmail.com
name : Ervin Dugg email : Ervin69@gmail.com 
name : Graham Bell email : Graham@bell.biz
name : James Rush email : james369@hotmail.com
name : Deepak Dev email : deepak@gmail.com
name : Ajay Rich email : therichposts@gmail.com
All users printed in console

Посмотрим, как работает код. Во-первых, когда python вызовет его, он не позволит выполнить оператор печати под ним до тех пор, пока он не завершится. Итак, после входа внутрь будет создана параллельная задача, чтобы операторы под ней могли выполняться одновременно с этой задачей, которая находится здесь. когда эта задача будет запущена за это время Fetching usersбудет напечатан в консоли. После этого питон будет ждать ответа от, потому что пользователи не должны печататься до получения. после завершения fetch_users()все имя пользователя и адрес электронной почты будут напечатаны в консоли. Таким образом, после завершения print_users() Оператор печати ниже будет выполнен.

Очень простой и гладкий пример здесь:

      import asyncio

async def my_task1():
    print("Task 1 started")
    await asyncio.sleep(1)  # some light io task
    print("Task 1 completed")
    return "Done1"

async def my_task2():
    print("Task 2 started")
    await asyncio.sleep(2)  # some heavy io task
    print("Task 2 completed")
    return "Done2"

async def main():
    # both the functions are independent of each other, 
    # as tasks gets completes, `.gather` keeps on storing the results
    results = await asyncio.gather(my_task1(), my_task2())
    print(f"The results are {results}")
    
    # if task1 is dependent on completion of task2, then use this
    ret1 = await my_task2()
    ret2 = await my_task1()
    print(f"The ret1: {ret1} ret2 {ret2}")

asyncio.run(main())

Даже думал, что некоторые ответы сверху, я думал, были немного абстрактными

      from datetime import datetime
import asyncio




async def time_taking(max_val,task_no):
    print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))

    await asyncio.sleep(2)
    value_list = []
    for i in range(0,max_val):
        value_list.append(i)

    print("****FINSIHING UP TASk NO {}  **".format(task_no))
    return value_list



async def test2(task_no):
    await asyncio.sleep(5)
    print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
    await asyncio.sleep(5)
    print("****FINSIHING UP  TASk NO {}  **".format(task_no))

async def function(value = None):
    tasks = []
    start_time = datetime.now()
    
    # CONCURRENT TASKS
    tasks.append(asyncio.create_task(time_taking(20,1)))
    tasks.append(asyncio.create_task(time_taking(43,2)))
    tasks.append(asyncio.create_task(test2(3)))
    
    # concurrent execution
    lists = await asyncio.gather(*tasks)
    end_time = datetime.now()
    
    time_taken = end_time - start_time
    return lists,time_taken


# run inside event loop 
res,time_taken = asyncio.run(function())

print(res,time_taken)
Другие вопросы по тегам