Логика / поток приложения для синхронных / асинхронных задач с UrWid (Python)

Я застрял со специфической проблемой при написании приложения, которое принимает дамп базы данных в качестве входных данных, а затем позволяет пользователю выполнять небольшой набор операций с импортированными данными (например, экспортировать подмножества или полные данные в формате CSV или XLS или искать конкретные наборы данных и поиск связанных записей).

Приложение разработано для нескольких целей: в качестве интерактивного инструмента с базовыми Urwid Пользовательский интерфейс, как инструмент командной строки для запуска из cron и, как дополнительное преимущество, я хотел бы использовать его в качестве библиотеки для Django Веб-приложение для обработки массовых импортов / обновлений.

Несколько частей уже работают, например, чтение файла дампа (> 3 000 000 строк) и загрузка из него таблиц (> 300). Я также могу искать в импортированных данных и распечатать данные на стандартный вывод.

В настоящее время я создаю пользовательский интерфейс для терминала с Urwid, При запуске приложения и инициализации данных, это занимает примерно 20 - 30 секунд, пока все 3 мельницы. строки обрабатываются. В это время я показываю заставку и строку состояния со счетчиком, а также короткое текстовое сообщение с количеством уже обработанных строк.

Я обнаружил, что Urwid использует основной поток, поэтому для обновления строки состояния при подготовке данных я создал потоки, которые используют событие, сигнализирующее, что загрузка файла завершена и можно начать подготовку базы данных:

    threading.Thread(
        target=prepare_tables, args=[
            self.stop_ev,
            self.file_available,
            self.task_done,
            self.tables,
            self.rows,
            self.status
        ],
        name='prepare_tables',
    ).start()
    threading.Thread(
        target=read_file, args=[
            self.stop_ev,
            self.file_available,
            self.inputfile,
            self.rows,
            self.status
        ],
        name='read_file',
    ).start()

Обратите внимание, что prepare_tables поток знает о втором событии, task_done который я изобрел, чтобы сигнализировать о том, что долго выполняющаяся асинхронная задача (например, файловый ввод / вывод) завершена. В настоящее время я ожидаю, что только одна долго выполняющаяся задача будет обрабатываться асинхронно в любой момент времени, большая часть приложения может быть синхронной (т.е. блокировать).

Для обработки сообщений о состоянии я написал Status класс, который запускает собственный поток и передает сообщения о состоянии в очередь (для краткости код удален):

# coding: utf-8
import itertools
import threading

# Constants
CHAR = itertools.cycle('|/-\\')


def send_status(status):
    """send timestamp to queue 5 times a second"""
    while not status.stop_ev.wait(timeout=0.2):
        if status.has_update:
            status.spinner = next(CHAR)
            status.msg_queue.put(status)
            status.has_update = False


class Status:
    def __init__(self, msg_queue, stop_ev):
        self.data = {}
        self.msg_queue = msg_queue
        self.stop_ev = stop_ev
        self.has_update = False
        self.text = ''
        self.spinner = ''
        # Initialize status update thread
        threading.Thread(
            target=send_status, args=[self],
            name='send_status',
        ).start()

    def update(self, msg, **kwargs):
        if not msg:
            return
        text = msg + points]
        if 'ext' in kwargs:
            extras = len(kwargs['ext'])
            text += ' ('
            i = 0
            for k, v in kwargs['ext'].items():
                i += 1
                if i < extras:
                    pad = ', '
                else:
                    pad = ''
                text += str(v) + ' ' + k + pad
            text += ')'
        if text:
            self.text = text
            self.has_update = True

Дисплей обрабатывается универсальным Interface учебный класс:

# coding: utf-8
from .View import View

import queue
import urwid


def unhandled_input(k):
    if k == 'esc':
        raise urwid.ExitMainLoop()


class Interface:
    palette = [
...
]

    def __init__(self, msg_queue, task_done):
        self.view = View()
        self.loop = urwid.MainLoop(self.view.content, self.palette,
                                   unhandled_input=unhandled_input)
        self.msg_queue = msg_queue
        self.task_done = task_done
        self.view.update(body='splash_screen')
        self.check_messages(self.loop, None)

    def check_messages(self, loop, *_args):
        """add message to bottom of screen"""
        loop.set_alarm_in(
            sec=0.2,
            callback=self.check_messages,
        )
        try:
            status = self.msg_queue.get_nowait()
        except queue.Empty:
            return
        self.view.update(status=status)

Наконец то Interface начинается с моего основного класса (который запускает read_file и prepare_tables потоки):

    Interface(self.msg_queue, self.task_done).loop.run()

Status поток подает очередь в то время как check_messages метод Interface класс потребляет его для обновления строки состояния. Пока здесь все работает как положено.

Чего я не смог достичь, так это выйти за рамки, когда база данных стала готова. Мне удалось изменить строку состояния в последний раз, чтобы указать, сколько таблиц / строк было прочитано. Тем не менее, это делается как последний шаг от prepare_tables поток, так что это часть асинхронной операции.

Далее я хотел бы заменить заставку моим меню приложения, которое позволяет пользователю выбрать следующее действие (например, поиск некоторых данных или экспорт таблиц в формате CSV или...).

Тем не менее, так как Interface работает в главном потоке, работает run() метод loop блоки, пока я не выйду из заставки с помощью клавиши escape. Вот журнал типичного прогона:

2018-05-04 16:43:14    DEBUG: [MainThread    ] dumptool.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] status.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] status.__init__: stop
2018-05-04 16:43:14    DEBUG: [MainThread    ] dumptool.setup: start
2018-05-04 16:43:14    DEBUG: [prepare_tables] dumptool.prepare_tables: start
2018-05-04 16:43:14     INFO: [prepare_tables] waiting for file content to become available...
2018-05-04 16:43:14    DEBUG: [read_file     ] dumptool.read_file: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] dumptool.run_ui: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] Interface.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] View.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] Header.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] Header.__init__: stop
2018-05-04 16:43:14    DEBUG: [MainThread    ] Footer.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] FStatus.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] FStatus.__init__: stop
2018-05-04 16:43:14    DEBUG: [MainThread    ] FHelp.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] FHelp.__init__: stop
2018-05-04 16:43:14    DEBUG: [MainThread    ] Footer.__init__: stop
2018-05-04 16:43:14    DEBUG: [MainThread    ] View.__init__: stop
2018-05-04 16:43:14    DEBUG: [MainThread    ] SplashScreen.__init__: start
2018-05-04 16:43:14    DEBUG: [MainThread    ] SplashScreen.__init__: stop
2018-05-04 16:43:14    DEBUG: [MainThread    ] Interface.__init__: stop
2018-05-04 16:43:33    DEBUG: [read_file     ] dumptool.read_file: stop
2018-05-04 16:43:33     INFO: [prepare_tables] file available, building db
2018-05-04 16:44:00     INFO: [prepare_tables] Database ready, read 323 tables with 3225848 rows total
2018-05-04 16:44:00    DEBUG: [prepare_tables] dumptool.prepare_tables: stop
2018-05-04 16:44:10    DEBUG: [MainThread    ] Interface.unhandled_input: start
2018-05-04 16:44:10    DEBUG: [MainThread    ] Interface.unhandled_input: stop
2018-05-04 16:44:11    DEBUG: [MainThread    ] dumptool.stop_threads: stopping threads
2018-05-04 16:44:11    DEBUG: [MainThread    ] dumptool.run_ui: stop

Я прочитал все Urwid Документы, которые я мог найти, я также искал похожие подходы здесь, на stackru и в других местах, но я не нашел полезных подсказок о том, как действовать дальше.

Единственное, что сработало на полпути, - это использовать check_messages() метод также проверить, если task_done событие установлено, и если так, вызовите другой метод, который управлял интерфейсом.

Однако у меня сложилось впечатление, что это не является ни предпочтительным, ни питонским способом, а также не предназначенным для Urwid Пользовательский интерфейс, который, в конце концов, имеет run() метод, который должен быть там, где находится логика приложения.

Но, как я ни старался, я не нашел способа правильно подключить или переопределить run() метод. Кто-нибудь из вас может указать мне направление работы? Делал ли я некоторые ложные предположения по пути при разработке приложения? Любая помощь очень ценится, спасибо заранее!

0 ответов

Другие вопросы по тегам