Как тестировать asyncio потоки (сопрограммы, задачи)?
Что мне делать, чтобы проверить login
сопрограмма?
class Client:
def __init__(self, config=None):
self.config = config or ('0.0.0.0', 8080)
self.reader = None
self.writer = None
self.connection = asyncio.create_task(self._connect())
async def _connect(self):
self.reader, self.writer = await asyncio.open_connection(*self.config)
async def login(self, username, password):
await self.connection # 2
md5 = hashlib.md5()
md5.update(username.encode(ENCODING) + password.encode(ENCODING))
hash_ = md5.hexdigest()
# `construct` returns message, in bytes, based on parameters.
login_message = construct(1, username, password, 182, hash_, 157)
self.writer.write(login_message) # 1
await self.writer.drain()
В частности, я хочу проверить, что сообщение для входа всегда отправляется в правильном формате (1), и, возможно, также тот факт, что соединение должно быть установлено сначала перед отправкой сообщения (2). Кроме того, я не могу запустить локальную копию сервера, если это имеет значение.
Это возможно? Стоит ли мне вообще это проверить?
я использую pytest
, Я пробовал использовать библиотеку под названием pytest-asyncio
, но я просто не понимаю, как что-то подобное тестировать.