Как вызвать асинхронную функцию из синхронизированного кода Python
Поэтому я привязан к интерпретатору python 3.6.2, который следует за моим настольным приложением.
То, что я хочу, это вызвать асинхронную функцию из синхронизированного метода или функции.
При вызове функции python из настольного приложения это должна быть обычная функция, которую нельзя ожидать.
Из настольного приложения я могу отправить список URL-адресов, и мне нужно отправить ответный ответ с каждого URL-адреса в асинхронном режиме.
вот моя попытка, я пометил SyntaxError, которую я не знаю, как обойти.
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10
class FeatureProcessor(object):
def __init__(self):
pass
def input(self, feature):
urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
feature.getAttribute('_list{}._xmin'),\
feature.getAttribute('_list{}._ymin'),\
feature.getAttribute('_list{}._xmax'),\
feature.getAttribute('_list{}._ymax'))
-> SyntaxError: newfeature = await main(urls_and_coords)
self.pyoutput(newfeature)
def close(self):
pass
async def main(urls):
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession(loop=loop) as session:
feature = loop.run_until_complete(fetch_all(session, urls, loop))
return feature
async def fetch_all(session, urls, loop):
results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
return results
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url[0]) as response:
newFeature = fmeobjects.FMEFeature()
response_data = await response
newFeature.setAttribute('response', response_data)
newFeature.setAttribute('_xmin',url[1])
newFeature.setAttribute('_xmax',url[2])
newFeature.setAttribute('_ymin',url[3])
newFeature.setAttribute('_ymax',url[4])
return newFeature
Я попытался внести эти изменения: импорт fme импорт fmeobjects импорт asyncio импорт aiohttp импорт async_timeout logger = fmeobjects.FMELogFile()
class FeatureProcessor(object):
def __init__(self):
pass
def input(self, feature):
urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
feature.getAttribute('_list{}._xmin'),\
feature.getAttribute('_list{}._ymin'),\
feature.getAttribute('_list{}._xmax'),\
feature.getAttribute('_list{}._ymax'))
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(loop, urls_and_coords))
#feature.setAttribute('result',result)
self.pyoutput(feature)
def close(self):
pass
async def main(loop, urls):
async with aiohttp.ClientSession(loop=loop) as session:
return await fetch_all(session, urls, loop)
async def fetch_all(session, urls, loop):
results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
return results
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url[0]) as response:
#newFeature = fmeobjects.FMEFeature()
response = await response
#newFeature.setAttribute('response', response_data)
#newFeature.setAttribute('_xmin',url[1])
#newFeature.setAttribute('_xmax',url[2])
#newFeature.setAttribute('_ymin',url[3])
#newFeature.setAttribute('_ymax',url[4])
return response, url[1], url[2], url[3], url[4]
но теперь я в конечном итоге с этой ошибкой:
Python Exception <TypeError>: object ClientResponse can't be used in 'await'
expression
Traceback (most recent call last):
File "<string>", line 20, in input
File "asyncio\base_events.py", line 467, in run_until_complete
File "<string>", line 29, in main
File "<string>", line 33, in fetch_all
File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression
4 ответа
Ответ @deceze - это, вероятно, лучшее, что вы можете сделать в Python 3.6. Но в Python 3.7 вы можете напрямую использовать asyncio.run следующим образом:
newfeature = asyncio.run(main(urls))
Он будет правильно создавать, обрабатывать и закрывать event_loop
,
Вы бы использовали цикл обработки событий для выполнения асинхронной функции до завершения:
newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))
(Эта техника уже используется внутри main
, И я не уверен почему, так как main
является async
Вы могли / должны использовать await fetch_all(...)
там.)
Другой вариант, который может быть полезен, - это syncer
Пакет PyPI:
from syncer import sync
@sync
async def print_data():
print(await get_data())
print_data() # Can be called synchronously
Мне удалось заставить это работать на чистом Python 3.10, используя встроенныйasyncio.run_coroutine_threadsafe
.
Это ново для меня, поэтому, вероятно, есть некоторые предостережения, например, поскольку асинхронный метод на самом деле не ожидается, процесс может (будет ) завершиться до завершения обратного вызова ( если вы не сделаете что-то, чтобы гарантировать, что этого не произойдет ).
Информацию о том, где это может произойти в дикой природе, см.bleak
Класс библиотеки BLEBleakClient
метод обратного вызоваdisconnected_callback
. Затем в обратном вызове попробуйтеemit
используя асинхронную версиюsocket.io
клиент,AsyncClient
.
Краткая проблема/решение:
import asyncio
from typing import Callable
Callback = Callable[[int], None]
class SomeSystem:
"""Some library you don't control that is mostly async, but provides a callback that
is _not_ async."""
def __init__(self, callback: Callback):
self._callback = callback
async def do_something(self):
"""do some work and then call the non-async callback"""
await asyncio.sleep(1.0)
self._callback(1)
await asyncio.sleep(1.0)
self._callback(2)
async def some_async_method(value: int):
"""some long-running operation normally called by async code"""
await asyncio.sleep(0.1)
print(f"long-running: {value}")
async def main():
"""main is async and started as normal with asyncio.run"""
print("BEGIN main")
loop = asyncio.get_running_loop()
def cb(value: int) -> None:
"""This method _cannot_ be async, due to the underlying implementation of SomeSystem."""
# some_async_method(value) # RuntimeWarning: coroutine 'some_async_method' was never awaited
asyncio.run_coroutine_threadsafe(some_async_method(value), loop) # okay
system = SomeSystem(cb)
await system.do_something()
# maybe ensure the last call to async method is awaited? Without this call, the final callback
# won't be handled, since it's never being awaited. If anyone knows how to properly wait
# for this, let me know in the comments!
await asyncio.sleep(1.0)
print("END main")
if __name__ == "__main__":
asyncio.run(main())
Выход
BEGIN main
long-running: 1
long-running: 2
END main