Реактивный цикл событий в Python

Я пытаюсь построить систему, которая собирает данные из некоторых источников с помощью ввода-вывода (HDD, сеть...)

Для этого у меня есть класс (контроллер), который запускает коллекторы.

Каждый коллектор представляет собой бесконечный цикл с классическим процессом ETL (извлечение, преобразование и загрузка).

Я хочу отправить некоторые команды коллекторам (остановить, перезагрузить настройки...) из интерфейса (CLI, web...), и я не уверен, как это сделать.

Например, это скелет для коллекционера:

class Collector(object):
    def __init__(self):
        self.reload_settings()

    def reload_settings(self):
        # Get the settings
        # Set the settings as attributes

    def process_data(self, data):
        # Do something

    def run(self):
        while True:
            data = retrieve_data()
            self.process_data(data)

И это скелет для контроллера:

class Controller(object):
    def __init__(self, collectors):
        self.collectors = collectors

    def run(self):
        for collector in collectors:
            collector.run()

    def reload_settings(self):
        ??

    def stop(self):
        ??

Существует ли классический шаблон проектирования, который решает эту проблему (публикация-подписка, цикл обработки событий, реактор...)? Как лучше всего решить эту проблему?

П.Д.: Очевидно, что это будет многопроцессорное приложение, которое будет работать на одной машине.

2 ответа

Здесь есть несколько вариантов, но они сводятся к двум основным типам: кооперативный (цикл событий / реактор / сопрограмма / явный гринлет) или упреждающий (неявный гринлет / нить / многопроцессный).

Первый требует гораздо больше реструктуризации ваших коллекционеров. Это может быть хорошим способом сделать недетерминизм явным или достичь массового параллелизма, но ни один из них не кажется здесь уместным. Второй просто требует прикрепления коллекторов к потокам и использования некоторого механизма синхронизации как для обмена данными, так и для обмена данными. Кажется, что у вас нет общих данных, и ваше общение тривиально и не очень чувствительно ко времени. Итак, я бы пошел с потоками.

Если предположить, что вы хотите использовать потоки в общем смысле, предположить, что ваши сборщики связаны с вводом-выводом, а у вас их нет, я бы остановился на реальных потоках.

Итак, вот один способ, которым вы можете написать это:

class Collector(threading.Thread):
    def __init__(self):
        self._reload_settings()
        self._need_reload = threading.Event()
        self._need_stop = threading.Event()

    def _reload_settings(self):
        # Get the settings
        # Set the settings as attributes
        self._need_reload.clear()

    def reload_settings(self):
        self._need_reload.set()

    def stop(self):
        self._need_stop.set()

    def process_data(self, data):
        # Do something

    def run(self):
        while not self._need_stop.is_set():
            if self._need_reload.is_set():
                self._reload_settings()
            data = retrieve_data()
            self.process_data(data)

class Controller(object):
    def __init__(self, collectors):
        self.collectors = collectors

    def run(self):
        for collector in self.collectors:
            collector.start()

    def reload_settings(self):
        for collector in self.collectors:
            collector.reload_settings()

    def stop(self):
        for collector in self.collectors:
            collector.stop()
        for collector in self.collectors:
            collector.join()

(Хотя я бы назвал Controller.run метод stopпотому что он лучше вписывается в названия, используемые не только Thread, но также с помощью классов сервера stdlib и других подобных вещей.)

Я бы посмотрел на возможность адаптации вашего случая к архитектуре клиент-сервер на основе сокетов, в которой Controller мог бы создать необходимое количество коллекторов, каждый из которых прослушивал собственный порт и обрабатывал полученные данные более элегантным способом с помощью метода handle() сервера. Тот факт, что данные поступают из различных источников ввода-вывода, еще больше говорит об этом решении - вы можете использовать клиентскую часть этой архитектуры для стандартизации протокола DataSource -> Collector.

https://docs.python.org/2/library/socketserver.html

Другие вопросы по тегам