Как вызвать асинхронную функцию из синхронизированного кода Python

Поэтому я привязан к интерпретатору python 3.6.2, который следует за моим настольным приложением.

То, что я хочу, это вызвать асинхронную функцию из синхронизированного метода или функции.

При вызове функции python из настольного приложения это должна быть обычная функция, которую нельзя ожидать.

Из настольного приложения я могу отправить список URL-адресов, и мне нужно отправить ответный ответ с каждого URL-адреса в асинхронном режиме.

вот моя попытка, я пометил SyntaxError, которую я не знаю, как обойти.

import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        -> SyntaxError: newfeature = await main(urls_and_coords)
        self.pyoutput(newfeature)

    def close(self):
       pass 

async def main(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        feature = loop.run_until_complete(fetch_all(session, urls, loop))
        return feature

async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results


async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            newFeature = fmeobjects.FMEFeature()
            response_data = await response
            newFeature.setAttribute('response', response_data)
            newFeature.setAttribute('_xmin',url[1])
            newFeature.setAttribute('_xmax',url[2])
            newFeature.setAttribute('_ymin',url[3])
            newFeature.setAttribute('_ymax',url[4])
            return newFeature

Я попытался внести эти изменения: импорт fme импорт fmeobjects импорт asyncio импорт aiohttp импорт async_timeout logger = fmeobjects.FMELogFile()

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(main(loop, urls_and_coords))
        #feature.setAttribute('result',result)
        self.pyoutput(feature)

    def close(self):
       pass 

async def main(loop, urls):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await fetch_all(session, urls, loop)


async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results


async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            #newFeature = fmeobjects.FMEFeature()
            response = await response
            #newFeature.setAttribute('response', response_data)
            #newFeature.setAttribute('_xmin',url[1])
            #newFeature.setAttribute('_xmax',url[2])
            #newFeature.setAttribute('_ymin',url[3])
            #newFeature.setAttribute('_ymax',url[4])
            return response, url[1], url[2], url[3], url[4]

но теперь я в конечном итоге с этой ошибкой:

Python Exception <TypeError>: object ClientResponse can't be used in 'await' 
expression
Traceback (most recent call last):
  File "<string>", line 20, in input
  File "asyncio\base_events.py", line 467, in run_until_complete
  File "<string>", line 29, in main
  File "<string>", line 33, in fetch_all
  File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression

4 ответа

Ответ @deceze - это, вероятно, лучшее, что вы можете сделать в Python 3.6. Но в Python 3.7 вы можете напрямую использовать asyncio.run следующим образом:

newfeature = asyncio.run(main(urls))

Он будет правильно создавать, обрабатывать и закрывать event_loop,

Вы бы использовали цикл обработки событий для выполнения асинхронной функции до завершения:

newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))

(Эта техника уже используется внутри main, И я не уверен почему, так как main является async Вы могли / должны использовать await fetch_all(...) там.)

Другой вариант, который может быть полезен, - это syncer Пакет PyPI:

from syncer import sync

@sync
async def print_data():
    print(await get_data())

print_data()  # Can be called synchronously

Мне удалось заставить это работать на чистом Python 3.10, используя встроенныйasyncio.run_coroutine_threadsafe.

Это ново для меня, поэтому, вероятно, есть некоторые предостережения, например, поскольку асинхронный метод на самом деле не ожидается, процесс может (будет ) завершиться до завершения обратного вызова ( если вы не сделаете что-то, чтобы гарантировать, что этого не произойдет ).

Информацию о том, где это может произойти в дикой природе, см.bleakКласс библиотеки BLEBleakClientметод обратного вызоваdisconnected_callback. Затем в обратном вызове попробуйтеemitиспользуя асинхронную версиюsocket.ioклиент,AsyncClient.

Краткая проблема/решение:

      import asyncio
from typing import Callable

Callback = Callable[[int], None]


class SomeSystem:
    """Some library you don't control that is mostly async, but provides a callback that
    is _not_ async."""

    def __init__(self, callback: Callback):
        self._callback = callback

    async def do_something(self):
        """do some work and then call the non-async callback"""
        await asyncio.sleep(1.0)
        self._callback(1)
        await asyncio.sleep(1.0)
        self._callback(2)


async def some_async_method(value: int):
    """some long-running operation normally called by async code"""
    await asyncio.sleep(0.1)
    print(f"long-running: {value}")


async def main():
    """main is async and started as normal with asyncio.run"""
    print("BEGIN main")

    loop = asyncio.get_running_loop()

    def cb(value: int) -> None:
        """This method _cannot_ be async, due to the underlying implementation of SomeSystem."""
        # some_async_method(value)  # RuntimeWarning: coroutine 'some_async_method' was never awaited
        asyncio.run_coroutine_threadsafe(some_async_method(value), loop)  # okay

    system = SomeSystem(cb)
    await system.do_something()

    # maybe ensure the last call to async method is awaited? Without this call, the final callback
    # won't be handled, since it's never being awaited. If anyone knows how to properly wait
    # for this, let me know in the comments!
    await asyncio.sleep(1.0)

    print("END main")


if __name__ == "__main__":
    asyncio.run(main())

Выход

      BEGIN main
long-running: 1
long-running: 2
END main
Другие вопросы по тегам