Как я могу уведомить наблюдателей RxPY в отдельных потоках, используя asyncio?

(Примечание: фон этой проблемы довольно многословен, но внизу есть SSCCE, к которому можно пропустить)

Фон

Я пытаюсь разработать CLI на основе Python для взаимодействия с веб-сервисом. В моей кодовой базе у меня есть CommunicationService класс, который обрабатывает все прямое общение с веб-сервисом. Это подвергает received_response свойство, которое возвращает Observable (из RxPY), на который могут подписаться другие объекты, чтобы получать уведомления о получении ответов от веб-службы.

Я основал свою логику CLI на click библиотека, где одна из моих подкоманд реализована следующим образом:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)
    if response_handler is None:
        return None

    while True:
        response = await self.on_response
        success, value = response_handler(response)
        print(success, value)
        if success:
            return value

Что здесь происходит (в случае, если response_handler не является None) подкоманда ведет себя как сопрограмма, ожидающая ответов от веб-службы (self.on_response == CommunicationService.received_response) и возвращает обработанное значение из первого ответа, который он может обработать.

Я пытаюсь проверить поведение моего CLI, создав контрольные примеры, в которых CommunicationService полностью высмеивается; подделка Subject создается (который может действовать как Observable) а также CommunicationService.received_response издевается, чтобы вернуть его. В рамках теста субъект on_next метод вызывается для передачи ложных ответов веб-службы обратно в производственный код:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    context.mock_received_response_subject.on_next(context.text)

Я использую функцию click 'result callback', которая вызывается в конце вызова CLI и блокируется, пока не будет выполнена сопрограмма (подкоманда):

@cli.resultcallback()
def _handle_command_task(task: Coroutine, **_) -> None:
    if task:
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(task)
        loop.close()
        print('RESULT:', result) 

проблема

В начале теста я бегу CliRunner.invoke отстрелить весь шебанг. Проблема в том, что это блокирующий вызов, и он будет блокировать поток до тех пор, пока CLI не завершит работу и не вернет результат, что бесполезно, если мне нужно продолжить выполнение моего тестового потока, чтобы он мог одновременно генерировать ложные ответы веб-службы.

Я думаю, мне нужно бежать CliRunner.invoke в новой теме, используя ThreadPoolExecutor, Это позволяет тестовой логике продолжить работу в исходном потоке и выполнить @when шаг размещен выше. Тем не менее, уведомления, опубликованные с mock_received_response_subject.on_next похоже, что не запускается выполнение внутри подкоманды.

Я считаю, что решение будет включать использование RxPY AsyncIOScheduler, но я нахожу документацию по этому немного скудной и бесполезной.

SSCCE

Ниже приведен фрагмент, который, как я надеюсь, является сутью проблемы. Если это можно изменить для работы, я смогу применить то же решение к моему фактическому коду, чтобы заставить его работать так, как я хочу.

import asyncio
import logging
import sys
import time

import click
from click.testing import CliRunner
from rx.subjects import Subject

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()

thread_loop = asyncio.new_event_loop()


@click.group()
def cli():
    asyncio.set_event_loop(thread_loop)


@cli.resultcallback()
def result_handler(task, **_):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task) # Should block until subject publishes value
    loop.close()

    print(result)


@cli.command()
async def get_web_response():
    return await web_response_observable


def test():
    runner = CliRunner()
    future = thread_loop.run_in_executor(None, runner.invoke, cli, ['get_web_response'])
    time.sleep(1)
    web_response_subject.on_next('foo') # Simulate reception of web response.
    time.sleep(1)
    result = future.result()
    print(result.output)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(threadName)10s %(name)18s: %(message)s',
    stream=sys.stderr,
)

test()

Текущее поведение

Программа зависает при запуске, блокируя на result = loop.run_until_complete(task),

Критерии приемки

Программа завершается и печатает foo на stdout,

Обновление 1

Основываясь на помощи Винсента, я внес некоторые изменения в свой код.

Relay.enabled (подкоманда, которая ожидает ответы от веб-службы для их обработки), теперь реализована следующим образом:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)

    if response_handler is None:
        return None

    return await self.on_response \
        .select(response_handler) \
        .where(lambda result, i: result[0]) \
        .select(lambda result, index: result[1]) \
        .first()

Я не был уверен, как await будет вести себя с RxPY наблюдаемые - будут ли они возвращать выполнение вызывающей стороне для каждого сгенерированного элемента или только после того, как наблюдаемое завершено (или с ошибкой?). Теперь я знаю, что это последнее, что, честно говоря, кажется более естественным выбором и позволило мне сделать реализацию этой функции более элегантной и реактивной.

Я также изменил шаг теста, который генерирует ложные ответы веб-службы:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = asyncio.get_event_loop()
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

К сожалению, это не будет работать так, как есть, поскольку CLI вызывается в своем собственном потоке...

@when('the CLI is run with "{arguments}"')
def step_impl(context, arguments):
    loop = asyncio.get_event_loop()
    if 'async.cli' in context.tags:
        context.async_result = loop.run_in_executor(None, context.cli_runner.invoke, testcube.cli, arguments.split())
    else:
        ...

И CLI создает свой собственный поток-частный цикл событий при вызове...

def cli(context, hostname, port):
    _initialize_logging(context.meta['click_log.core.logger']['level'])

    # Create a new event loop for processing commands asynchronously on.
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    ...

Я думаю, что мне нужен способ, позволяющий моим шагам теста вызывать CLI в новом потоке и затем извлекать цикл событий, который он использует:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = _get_cli_event_loop() # Needs to be implemented.
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

Обновление 2

Кажется, не существует простого способа получить цикл обработки событий, который конкретный поток создает и использует для себя, поэтому вместо этого я воспользовался советом Виктора и высмеял его. asyncio.new_event_loop чтобы вернуть цикл обработки событий, который мой тестовый код создает и хранит:

def _apply_mock_event_loop_patch(context):
    # Close any already-existing exit stacks.
    if hasattr(context, 'mock_event_loop_exit_stack'):
        context.mock_event_loop_exit_stack.close()

    context.test_loop = asyncio.new_event_loop()
    print(context.test_loop)
    context.mock_event_loop_exit_stack = ExitStack()
    context.mock_event_loop_exit_stack.enter_context(
        patch.object(asyncio, 'new_event_loop', spec=True, return_value=context.test_loop))

Я изменяю свой тестовый шаг "Получен поддельный веб-ответ", чтобы выполнить следующее:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = context.test_loop
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

Хорошая новость в том, что я на самом деле получаю Relay.enabled сопрограмма для запуска, когда этот шаг выполняется!

Единственная проблема сейчас - последний шаг теста, в котором я жду будущего, которое я получил от выполнения CLI в его собственном потоке, и проверяю, что CLI отправляет это на stdout:

@then('the CLI should print "{output}"')
def step_impl(context, output):
    if 'async.cli' in context.tags:
        loop = asyncio.get_event_loop() # main loop, not test loop
        result = loop.run_until_complete(context.async_result)
    else:
        result = context.result
    assert_that(result.output, equal_to(output))

Я пытался поиграть с этим, но я не могу получить context.async_result (который хранит будущее от loop.run_in_executor) переходить приятно done и вернуть результат. При текущей реализации я получаю ошибку для первого теста (1.1) и неопределенное зависание для второго (1.2):

 @mock.comms @async.cli @wip
  Scenario Outline: Querying relay enable state -- @1.1                           # testcube/tests/features/relay.feature:45
    When the user queries the enable state of relay 0                             # testcube/tests/features/steps/relay.py:17 0.003s
    Then the CLI should query the web service about the enable state of relay 0   # testcube/tests/features/steps/relay.py:48 0.000s
    When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
      """
      {'module':'relays','path':'relays[0].enabled','data':[True]}'
      """
    Then the CLI should print "True"                                              # testcube/tests/features/steps/core.py:94 0.003s
      Traceback (most recent call last):
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1456, in run
          match.run(runner.context)
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1903, in run
          self.func(context, *args, **kwargs)
        File "testcube/tests/features/steps/core.py", line 99, in step_impl
          result = loop.run_until_complete(context.async_result)
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
          return future.result()
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
          raise self._exception
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/concurrent/futures/thread.py", line 55, in run
          result = self.fn(*self.args, **self.kwargs)
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/click/testing.py", line 299, in invoke
          output = out.getvalue()
      ValueError: I/O operation on closed file.

      Captured stdout:
      RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[0].enabled','data':[True]}'
      <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py:431]>

  @mock.comms @async.cli @wip
  Scenario Outline: Querying relay enable state -- @1.2                           # testcube/tests/features/relay.feature:46
    When the user queries the enable state of relay 1                             # testcube/tests/features/steps/relay.py:17 0.005s
    Then the CLI should query the web service about the enable state of relay 1   # testcube/tests/features/steps/relay.py:48 0.001s
    When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
      """
      {'module':'relays','path':'relays[1].enabled','data':[False]}'
      """
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[1].enabled','data':[False]}'
    Then the CLI should print "False"                                             # testcube/tests/features/steps/core.py:94

Глава 3: Финал

Винт все эти асинхронные многопоточные вещи, я слишком тупой для этого.

Во-первых, вместо того, чтобы описывать сценарий, как это...

When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
When the communications service receives a response from TestCube Web Service:
  """
  {"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
  """
Then the CLI should print "<relay_enabled>"

Мы описываем это так:

Given the communications service will respond to requests:
  """
  {"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
  """
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
And the CLI should print "<relay_enabled>"

Реализуйте новый шаг:

@given('the communications service will respond to requests')
def step_impl(context):
    response = context.text

    def publish_mock_response(_):
        loop = context.test_loop
        loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, response)

    # Configure the mock comms service to publish a mock response when a request is made.
    instance = context.mock_comms.return_value
    instance.send_request.on_next.side_effect = publish_mock_response

БУМ

2 features passed, 0 failed, 0 skipped
22 scenarios passed, 0 failed, 0 skipped
58 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.111s

1 ответ

Решение

Я вижу две проблемы с вашим кодом:

  • asyncio не является поточно-ориентированным, если вы не используете call_soon_threadsafe или run_coroutine_threadsafe. RxPy не использует ни одного из них в Observable.to_future, поэтому вы должны получить доступ RxPy объекты в том же потоке, который выполняет цикл событий asyncio.
  • RxPy устанавливает результат будущего, когда on_completed вызывается, так что ожидание наблюдаемой возвращает последний испущенный объект. Это означает, что вы должны позвонить обоим on_next а также on_completed получить await возвращать.

Вот рабочий пример:

import click
import asyncio
from rx.subjects import Subject
from click.testing import CliRunner

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()
main_loop = asyncio.get_event_loop()

@click.group()
def cli():
    pass

@cli.resultcallback()
def result_handler(task, **_):
    future = asyncio.run_coroutine_threadsafe(task, main_loop)
    print(future.result())

@cli.command()
async def get_web_response():
    return await web_response_observable

def test():
    runner = CliRunner()
    future = main_loop.run_in_executor(
        None, runner.invoke, cli, ['get_web_response'])
    main_loop.call_later(1, web_response_subject.on_next, 'foo')
    main_loop.call_later(2, web_response_subject.on_completed)
    result = main_loop.run_until_complete(future)
    print(result.output, end='')

if __name__ == '__main__':
    test()
Другие вопросы по тегам