Логика / поток приложения для синхронных / асинхронных задач с UrWid (Python)
Я застрял со специфической проблемой при написании приложения, которое принимает дамп базы данных в качестве входных данных, а затем позволяет пользователю выполнять небольшой набор операций с импортированными данными (например, экспортировать подмножества или полные данные в формате CSV или XLS или искать конкретные наборы данных и поиск связанных записей).
Приложение разработано для нескольких целей: в качестве интерактивного инструмента с базовыми Urwid
Пользовательский интерфейс, как инструмент командной строки для запуска из cron и, как дополнительное преимущество, я хотел бы использовать его в качестве библиотеки для Django
Веб-приложение для обработки массовых импортов / обновлений.
Несколько частей уже работают, например, чтение файла дампа (> 3 000 000 строк) и загрузка из него таблиц (> 300). Я также могу искать в импортированных данных и распечатать данные на стандартный вывод.
В настоящее время я создаю пользовательский интерфейс для терминала с Urwid
, При запуске приложения и инициализации данных, это занимает примерно 20 - 30 секунд, пока все 3 мельницы. строки обрабатываются. В это время я показываю заставку и строку состояния со счетчиком, а также короткое текстовое сообщение с количеством уже обработанных строк.
Я обнаружил, что Urwid использует основной поток, поэтому для обновления строки состояния при подготовке данных я создал потоки, которые используют событие, сигнализирующее, что загрузка файла завершена и можно начать подготовку базы данных:
threading.Thread(
target=prepare_tables, args=[
self.stop_ev,
self.file_available,
self.task_done,
self.tables,
self.rows,
self.status
],
name='prepare_tables',
).start()
threading.Thread(
target=read_file, args=[
self.stop_ev,
self.file_available,
self.inputfile,
self.rows,
self.status
],
name='read_file',
).start()
Обратите внимание, что prepare_tables
поток знает о втором событии, task_done
который я изобрел, чтобы сигнализировать о том, что долго выполняющаяся асинхронная задача (например, файловый ввод / вывод) завершена. В настоящее время я ожидаю, что только одна долго выполняющаяся задача будет обрабатываться асинхронно в любой момент времени, большая часть приложения может быть синхронной (т.е. блокировать).
Для обработки сообщений о состоянии я написал Status
класс, который запускает собственный поток и передает сообщения о состоянии в очередь (для краткости код удален):
# coding: utf-8
import itertools
import threading
# Constants
CHAR = itertools.cycle('|/-\\')
def send_status(status):
"""send timestamp to queue 5 times a second"""
while not status.stop_ev.wait(timeout=0.2):
if status.has_update:
status.spinner = next(CHAR)
status.msg_queue.put(status)
status.has_update = False
class Status:
def __init__(self, msg_queue, stop_ev):
self.data = {}
self.msg_queue = msg_queue
self.stop_ev = stop_ev
self.has_update = False
self.text = ''
self.spinner = ''
# Initialize status update thread
threading.Thread(
target=send_status, args=[self],
name='send_status',
).start()
def update(self, msg, **kwargs):
if not msg:
return
text = msg + points]
if 'ext' in kwargs:
extras = len(kwargs['ext'])
text += ' ('
i = 0
for k, v in kwargs['ext'].items():
i += 1
if i < extras:
pad = ', '
else:
pad = ''
text += str(v) + ' ' + k + pad
text += ')'
if text:
self.text = text
self.has_update = True
Дисплей обрабатывается универсальным Interface
учебный класс:
# coding: utf-8
from .View import View
import queue
import urwid
def unhandled_input(k):
if k == 'esc':
raise urwid.ExitMainLoop()
class Interface:
palette = [
...
]
def __init__(self, msg_queue, task_done):
self.view = View()
self.loop = urwid.MainLoop(self.view.content, self.palette,
unhandled_input=unhandled_input)
self.msg_queue = msg_queue
self.task_done = task_done
self.view.update(body='splash_screen')
self.check_messages(self.loop, None)
def check_messages(self, loop, *_args):
"""add message to bottom of screen"""
loop.set_alarm_in(
sec=0.2,
callback=self.check_messages,
)
try:
status = self.msg_queue.get_nowait()
except queue.Empty:
return
self.view.update(status=status)
Наконец то Interface
начинается с моего основного класса (который запускает read_file
и prepare_tables
потоки):
Interface(self.msg_queue, self.task_done).loop.run()
Status
поток подает очередь в то время как check_messages
метод Interface
класс потребляет его для обновления строки состояния. Пока здесь все работает как положено.
Чего я не смог достичь, так это выйти за рамки, когда база данных стала готова. Мне удалось изменить строку состояния в последний раз, чтобы указать, сколько таблиц / строк было прочитано. Тем не менее, это делается как последний шаг от prepare_tables
поток, так что это часть асинхронной операции.
Далее я хотел бы заменить заставку моим меню приложения, которое позволяет пользователю выбрать следующее действие (например, поиск некоторых данных или экспорт таблиц в формате CSV или...).
Тем не менее, так как Interface
работает в главном потоке, работает run()
метод loop
блоки, пока я не выйду из заставки с помощью клавиши escape. Вот журнал типичного прогона:
2018-05-04 16:43:14 DEBUG: [MainThread ] dumptool.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] status.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] status.__init__: stop
2018-05-04 16:43:14 DEBUG: [MainThread ] dumptool.setup: start
2018-05-04 16:43:14 DEBUG: [prepare_tables] dumptool.prepare_tables: start
2018-05-04 16:43:14 INFO: [prepare_tables] waiting for file content to become available...
2018-05-04 16:43:14 DEBUG: [read_file ] dumptool.read_file: start
2018-05-04 16:43:14 DEBUG: [MainThread ] dumptool.run_ui: start
2018-05-04 16:43:14 DEBUG: [MainThread ] Interface.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] View.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] Header.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] Header.__init__: stop
2018-05-04 16:43:14 DEBUG: [MainThread ] Footer.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] FStatus.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] FStatus.__init__: stop
2018-05-04 16:43:14 DEBUG: [MainThread ] FHelp.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] FHelp.__init__: stop
2018-05-04 16:43:14 DEBUG: [MainThread ] Footer.__init__: stop
2018-05-04 16:43:14 DEBUG: [MainThread ] View.__init__: stop
2018-05-04 16:43:14 DEBUG: [MainThread ] SplashScreen.__init__: start
2018-05-04 16:43:14 DEBUG: [MainThread ] SplashScreen.__init__: stop
2018-05-04 16:43:14 DEBUG: [MainThread ] Interface.__init__: stop
2018-05-04 16:43:33 DEBUG: [read_file ] dumptool.read_file: stop
2018-05-04 16:43:33 INFO: [prepare_tables] file available, building db
2018-05-04 16:44:00 INFO: [prepare_tables] Database ready, read 323 tables with 3225848 rows total
2018-05-04 16:44:00 DEBUG: [prepare_tables] dumptool.prepare_tables: stop
2018-05-04 16:44:10 DEBUG: [MainThread ] Interface.unhandled_input: start
2018-05-04 16:44:10 DEBUG: [MainThread ] Interface.unhandled_input: stop
2018-05-04 16:44:11 DEBUG: [MainThread ] dumptool.stop_threads: stopping threads
2018-05-04 16:44:11 DEBUG: [MainThread ] dumptool.run_ui: stop
Я прочитал все Urwid
Документы, которые я мог найти, я также искал похожие подходы здесь, на stackru и в других местах, но я не нашел полезных подсказок о том, как действовать дальше.
Единственное, что сработало на полпути, - это использовать check_messages()
метод также проверить, если task_done
событие установлено, и если так, вызовите другой метод, который управлял интерфейсом.
Однако у меня сложилось впечатление, что это не является ни предпочтительным, ни питонским способом, а также не предназначенным для Urwid
Пользовательский интерфейс, который, в конце концов, имеет run()
метод, который должен быть там, где находится логика приложения.
Но, как я ни старался, я не нашел способа правильно подключить или переопределить run()
метод. Кто-нибудь из вас может указать мне направление работы? Делал ли я некоторые ложные предположения по пути при разработке приложения? Любая помощь очень ценится, спасибо заранее!