Как использовать aiohttp ClientSession pool?
Документы говорят, чтобы повторно использовать ClientSession:
Не создавайте сеанс для каждого запроса. Скорее всего, вам нужен сеанс для приложения, который выполняет все запросы в целом.
Сеанс содержит пул соединений внутри, повторное использование соединения и keep-alives (оба включены по умолчанию) могут повысить общую производительность. 1
Но в документах нет объяснений, как это сделать? Есть один пример, который может быть уместным, но он не показывает, как повторно использовать пул в другом месте: http://aiohttp.readthedocs.io/en/stable/client.html
Будет ли что-то подобное быть правильным способом сделать это?
@app.listener('before_server_start')
async def before_server_start(app, loop):
app.pg_pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
app.http_session_pool = aiohttp.ClientSession()
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
app.http_session_pool.close()
app.pg_pool.close()
@app.post("/api/register")
async def register(request):
# json validation
async with app.pg_pool.acquire() as pg:
await pg.execute() # create unactivated user in db
async with app.http_session_pool as session:
# TODO send activation email using SES API
async with session.post('http://httpbin.org/post', data=b'data') as resp:
print(resp.status)
print(await resp.text())
return HTTPResponse(status=204)
3 ответа
Я думаю, что есть несколько вещей, которые можно улучшить:
1)
Экземпляр ClientSession
это один объект сеанса. Это в сеансе содержит пул соединений, но это не сам "session_pool". Я бы предложил переименовать http_session_pool
в http_session
или, может быть client_session
,
2)
сеанса close()
Метод - это земля. Вы должны ждать этого:
await app.client_session.close()
Или даже лучше (IMHO), вместо того, чтобы думать о том, как правильно открыть / закрыть сеанс, используйте стандартный асинхронный контекстный менеджер с ожиданием __aenter__
/ __aexit__
:
@app.listener('before_server_start')
async def before_server_start(app, loop):
# ...
app.client_session = await aiohttp.ClientSession().__aenter__()
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
await app.client_session.__aexit__(None, None, None)
# ...
3)
Обратите внимание на эту информацию:
Однако, если цикл обработки событий останавливается до закрытия основного соединения,
ResourceWarning: unclosed transport
предупреждение выдается (когда предупреждения включены).Чтобы избежать этой ситуации, перед закрытием цикла событий необходимо добавить небольшую задержку, чтобы все открытые основные соединения закрылись.
Я не уверен, что это обязательно в вашем случае, но нет ничего плохого в добавлении await asyncio.sleep(0)
внутри after_server_stop
в качестве документации советы:
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
# ...
await asyncio.sleep(0) # http://aiohttp.readthedocs.io/en/stable/client.html#graceful-shutdown
Upd:
Класс, который реализует __aenter__
/ __aexit__
может использоваться в качестве диспетчера асинхронного контекста (может использоваться в async with
заявление). Это позволяет выполнять некоторые действия перед выполнением внутреннего блока и после него. Это очень похоже на обычные контекстные менеджеры, но asyncio
связанные с. То же, что и обычный менеджер контекста, асинхронный, можно использовать напрямую (без async with
) вручную в ожидании __aenter__
/ __aexit__
,
Почему я думаю, что лучше создать / освободить сессию, используя __aenter__
/ __aexit__
вручную вместо использования close()
, например? Потому что мы не должны беспокоиться о том, что на самом деле происходит внутри __aenter__
/ __aexit__
, Представьте себе в будущих версиях aiohttp
создание сессии будет изменено с необходимостью ждать open()
например. Если вы будете использовать __aenter__
/ __aexit__
вам не нужно каким-то образом менять свой код.
кажется, что в aiohttp нет пула сеансов.
// просто опубликуйте официальные документы.
постоянный сеанс
вот
persistent-session
демонстрация использования на официальном сайте
https://docs.aiohttp.org/en/latest/client_advanced.html#persistent-session
app.cleanup_ctx.append(persistent_session)
async def persistent_session(app):
app['PERSISTENT_SESSION'] = session = aiohttp.ClientSession()
yield
await session.close()
async def my_request_handler(request):
session = request.app['PERSISTENT_SESSION']
async with session.get("http://python.org") as resp:
print(resp.status)
//TODO: полный рабочий демо-код
пул соединений
и у него есть пул соединений:
https://docs.aiohttp.org/en/latest/client_advanced.html#connectors
conn = aiohttp.TCPConnector()
#conn = aiohttp.TCPConnector(limit=30)
#conn = aiohttp.TCPConnector(limit=0) # nolimit, default is 100.
#conn = aiohttp.TCPConnector(limit_per_host=30) # default is 0
session = aiohttp.ClientSession(connector=conn)
Я нашел этот вопрос после поиска в Google о том, как повторно использовать экземпляр ClientSession aiohttp после того, как мой код вызвал это предупреждающее сообщение: UserWarning: Создание сеанса клиента вне сопрограммы - очень опасная идея
Этот код может не решить вышеупомянутую проблему, хотя это связано. Я новичок в Asy ncio и Aiohttp, так что это не может быть лучшей практикой. Это лучшее, что я мог придумать после прочтения множества, казалось бы, противоречивой информации.
Я создал класс ResourceManager, взятый из документов Python, который открывает контекст.
Экземпляр ResourceManager обрабатывает открытие и закрытие экземпляра aiohttp ClientSession с помощью магических методов. __aenter__
а также __aexit__
с помощью методов-обёрток BaseScraper.set_session и BaseScraper.close_session.
Мне удалось повторно использовать экземпляр ClientSession со следующим кодом.
Класс BaseScraper также имеет методы для аутентификации. Это зависит от стороннего пакета lxml.
import asyncio
from time import time
from contextlib import contextmanager, AbstractContextManager, ExitStack
import aiohttp
import lxml.html
class ResourceManager(AbstractContextManager):
# Code taken from Python docs: 29.6.2.4. of https://docs.python.org/3.6/library/contextlib.html
def __init__(self, scraper, check_resource_ok=None):
self.acquire_resource = scraper.acquire_resource
self.release_resource = scraper.release_resource
if check_resource_ok is None:
def check_resource_ok(resource):
return True
self.check_resource_ok = check_resource_ok
@contextmanager
def _cleanup_on_error(self):
with ExitStack() as stack:
stack.push(self)
yield
# The validation check passed and didn't raise an exception
# Accordingly, we want to keep the resource, and pass it
# back to our caller
stack.pop_all()
def __enter__(self):
resource = self.acquire_resource()
with self._cleanup_on_error():
if not self.check_resource_ok(resource):
msg = "Failed validation for {!r}"
raise RuntimeError(msg.format(resource))
return resource
def __exit__(self, *exc_details):
# We don't need to duplicate any of our resource release logic
self.release_resource()
class BaseScraper:
login_url = ""
login_data = dict() # dict of key, value pairs to fill the login form
loop = asyncio.get_event_loop()
def __init__(self, urls):
self.urls = urls
self.acquire_resource = self.set_session
self.release_resource = self.close_session
async def _set_session(self):
self.session = await aiohttp.ClientSession().__aenter__()
def set_session(self):
set_session_attr = self.loop.create_task(self._set_session())
self.loop.run_until_complete(set_session_attr)
return self # variable after "as" becomes instance of BaseScraper
async def _close_session(self):
await self.session.__aexit__(None, None, None)
def close_session(self):
close_session = self.loop.create_task(self._close_session())
self.loop.run_until_complete(close_session)
def __call__(self):
fetch_urls = self.loop.create_task(self._fetch())
return self.loop.run_until_complete(fetch_urls)
async def _get(self, url):
async with self.session.get(url) as response:
result = await response.read()
return url, result
async def _fetch(self):
tasks = (self.loop.create_task(self._get(url)) for url in self.urls)
start = time()
results = await asyncio.gather(*tasks)
print(
"time elapsed: {} seconds \nurls count: {}".format(
time() - start, len(urls)
)
)
return results
@property
def form(self):
"""Create and return form for authentication."""
form = aiohttp.FormData(self.login_data)
get_login_page = self.loop.create_task(self._get(self.login_url))
url, login_page = self.loop.run_until_complete(get_login_page)
login_html = lxml.html.fromstring(login_page)
hidden_inputs = login_html.xpath(r'//form//input[@type="hidden"]')
login_form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
for key, value in login_form.items():
form.add_field(key, value)
return form
async def _login(self, form):
async with self.session.post(self.login_url, data=form) as response:
if response.status != 200:
response.raise_for_status()
print("logged into {}".format(url))
await response.release()
def login(self):
post_login_form = self.loop.create_task(self._login(self.form))
self.loop.run_until_complete(post_login_form)
if __name__ == "__main__":
urls = ("http://example.com",) * 10
base_scraper = BaseScraper(urls)
with ResourceManager(base_scraper) as scraper:
for url, html in scraper():
print(url, len(html))