Порождение процессов и связь между процессами в приложении Python на основе трио

Для стажировки в FluidImage библиотеки Python мы исследуем, может ли быть хорошей идеей написать параллельное приложение HPC с моделью клиент / сервер с использованием трио библиотеки.

Для асинхронного программирования и ввода / вывода трио действительно великолепно!

Тогда мне интересно, как

  1. процессы порождения (серверы, выполняющие ограниченную работу CPU-GPU)
  2. передача сложных объектов Python (потенциально содержащих большие массивы) между процессами.

Я не нашел того, что было рекомендовано для этого с помощью trio в его документации (даже если учебник по эхо-клиенту / серверу - хорошее начало).

Одним из очевидных способов порождения процессов в Python и общения является использование многопроцессорной обработки.

В контексте HPC, я думаю, одно хорошее решение было бы использовать MPI ( http://mpi4py.readthedocs.io/en/stable/overview.html). Для справки я также должен упомянуть rpyc ( https://rpyc.readthedocs.io/en/latest/docs/zerodeploy.html).

Я не знаю, можно ли использовать такие инструменты вместе с трио и как правильно это сделать.

Интересный связанный вопрос

Замечание PEP 574

Мне кажется, что PEP 574 (см. https://pypi.org/project/pickle5/) также может быть хорошим решением этой проблемы.

5 ответов

К сожалению, на сегодняшний день (июль 2018 года) Trio еще не поддерживает порождение и связь с подпроцессами или какими-либо высокоуровневыми оболочками для MPI или других высокоуровневых межпроцессных протоколов координации.

Это определенно то, к чему мы в конечном итоге хотим добраться, и если вы хотите поговорить более подробно о том, что должно быть реализовано, то вы можете перейти в наш чат, или в этом выпуске есть обзор того, что необходимо для поддержки основных подпроцессов. Но если ваша цель состоит в том, чтобы в течение нескольких месяцев что-то сработало для вашей стажировки, то, честно говоря, вы можете рассмотреть более зрелые инструменты HPC, такие как dask.

По состоянию на середину 2018 года, Трио еще не сделал этого. Наилучшим вариантом на сегодняшний день является использование trio_asyncio использовать поддержку Asyncio для функций, которые Трио еще нужно изучить.

Я публикую очень наивный пример кода с использованием многопроцессорной обработки и трио (в основной программе и на сервере). Вроде работает.

from multiprocessing import Process, Queue
import trio
import numpy as np

async def sleep():
    print("enter sleep")
    await trio.sleep(0.2)
    print("end sleep")

def cpu_bounded_task(input_data):
    result = input_data.copy()
    for i in range(1000000-1):
        result += input_data
    return result

def server(q_c2s, q_s2c):
    async def main_server():
        # get the data to be processed
        input_data = await trio.run_sync_in_worker_thread(q_c2s.get)
        print("in server: input_data received", input_data)
        # a CPU-bounded task
        result = cpu_bounded_task(input_data)
        print("in server: sending back the answer", result)
        await trio.run_sync_in_worker_thread(q_s2c.put, result)

    trio.run(main_server)

async def client(q_c2s, q_s2c):
    input_data = np.arange(10)
    print("in client: sending the input_data", input_data)
    await trio.run_sync_in_worker_thread(q_c2s.put, input_data)
    result = await trio.run_sync_in_worker_thread(q_s2c.get)
    print("in client: result received", result)

async def parent(q_c2s, q_s2c):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(sleep)
        nursery.start_soon(client, q_c2s, q_s2c)
        nursery.start_soon(sleep)

def main():
    q_c2s = Queue()
    q_s2c = Queue()
    p = Process(target=server, args=(q_c2s, q_s2c))
    p.start()
    trio.run(parent, q_c2s, q_s2c)
    p.join()

if __name__ == '__main__':
    main()

Вы также можете проверитьtractorкоторый, наконец, похоже, выпустил первый альфа-релиз.

он имеет встроенную систему RPC, ориентированную на функции (очень похожую на trio) с использованием TCP и msgpack(но я думаю, что у них запланировано больше транспорта). Вы просто вызываете функции в других процессах напрямую и передаете/получаете результаты различными способами.

Вот их первый пример:

      """
Run with a process monitor from a terminal using::

    $TERM -e watch -n 0.1  "pstree -a $$" \
        & python examples/parallelism/single_func.py \
        && kill $!

"""
import os

import tractor
import trio


async def burn_cpu():

    pid = os.getpid()

    # burn a core @ ~ 50kHz
    for _ in range(50000):
        await trio.sleep(1/50000/50)

    return os.getpid()


async def main():

    async with tractor.open_nursery() as n:

        portal = await n.run_in_actor(burn_cpu)

        #  burn rubber in the parent too
        await burn_cpu()

        # wait on result from target function
        pid = await portal.result()

    # end of nursery block
    print(f"Collected subproc {pid}")


if __name__ == '__main__':
    trio.run(main)

Простой пример с mpi4py... Это может быть плохая работа с точки зрения трио, но, похоже, работает.

Связь осуществляется с trio.run_sync_in_worker_thread поэтому ( как написал Натаниэль Дж. Смит) (1) нет отмены (и нет поддержки control-C) и (2) используется больше памяти, чем задач трио (но один поток Python не использует так много памяти).

Но для обмена данными, включающими большие массивы, я бы так поступил, так как обмен данными с буферами будет очень эффективным с mpi4py.

import sys
from functools import partial

import trio

import numpy as np
from mpi4py import MPI

async def sleep():
    print("enter sleep")
    await trio.sleep(0.2)
    print("end sleep")

def cpu_bounded_task(input_data):
    print("cpu_bounded_task starting")
    result = input_data.copy()
    for i in range(1000000-1):
        result += input_data
    print("cpu_bounded_task finished ")
    return result

if "server" not in sys.argv:
    comm = MPI.COMM_WORLD.Spawn(sys.executable,
                                args=['trio_spawn_comm_mpi.py', 'server'])

    async def client():
        input_data = np.arange(4)
        print("in client: sending the input_data", input_data)
        send = partial(comm.send, dest=0, tag=0)
        await trio.run_sync_in_worker_thread(send, input_data)

        print("in client: recv")
        recv = partial(comm.recv, tag=1)
        result = await trio.run_sync_in_worker_thread(recv)
        print("in client: result received", result)

    async def parent():
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sleep)
            nursery.start_soon(client)
            nursery.start_soon(sleep)

    trio.run(parent)

    print("in client, end")
    comm.barrier()

else:
    comm = MPI.Comm.Get_parent()

    async def main_server():
        # get the data to be processed
        recv = partial(comm.recv, tag=0)
        input_data = await trio.run_sync_in_worker_thread(recv)
        print("in server: input_data received", input_data)
        # a CPU-bounded task
        result = cpu_bounded_task(input_data)
        print("in server: sending back the answer", result)
        send = partial(comm.send, dest=0, tag=1)
        await trio.run_sync_in_worker_thread(send, result)

    trio.run(main_server)
    comm.barrier()
Другие вопросы по тегам