Реализация таймера в Python
Общий обзор
- У меня есть проект Django среднего размера
- У меня есть куча префиксных деревьев в памяти (в отличие от БД)
- Узлы этих деревьев представляют сущности / объекты, на которые распространяется тайм-аут. Т.е. мне нужно тайм-аут этих узлов в разные моменты времени
Дизайн:
- По сути, мне была нужна конструкция Timer, которая позволяла бы мне запускать сбрасываемый таймер с 1 выстрелом, связывать и отдавать ему обратный вызов, который может выполнить некоторую операцию над объектом, создающим таймер, который в этом случае является узлом дерева.
Посмотрев различные варианты, я не смог найти ничего, что мог бы использовать изначально (например, какое-то приложение django). Объект Timer в Python не подходит для этого, так как он не будет масштабироваться / работать. Поэтому я решил написать свой собственный таймер на основе:
- Сортированный список объектов с изменением времени, который содержит временной горизонт
- Механизм срабатывания "галочки"
Варианты реализации:
- Обошел с оберткой вокруг Bisect для отсортированного списка дельты: http://code.activestate.com/recipes/577197-sortedcollection/
- Пошёл с сельдереем, чтобы поставить галочку - Гранулярность 1 минута, когда рабочий вызвал функцию timer_tick, предоставленную моим классом Timer. По сути, timer_tick должен проходить через отсортированный список, уменьшая головной узел каждый тик. Затем, если какие-либо узлы опустятся до 0, начните обратный вызов и удалите эти узлы из отсортированного списка таймеров.
- Создание таймера включает в себя создание экземпляра объекта Timer, который возвращает идентификатор объекта. Этот идентификатор хранится в БД и связан с записью в БД, которая представляет объект, создающий таймер
Дополнительные структуры данных: для отслеживания экземпляров Timer (которые создаются для каждого создания таймера) у меня есть словарь WeakRef, который отображает идентификатор на obj
Итак, по сути, у меня есть 2 структуры данных в памяти моего основного проекта Django.
Постановка задачи:
Поскольку работнику сельдерея необходимо пройти список таймеров, а также, возможно, изменить карту id2obj, похоже, мне нужно найти способ поделиться состоянием между моим работником сельдерея и основным
Проходя через SO/Google, я нахожу следующие предложения
- Менеджер
- Общая память
К сожалению, bisect wrapper не очень хорошо подходит для травления и / или совместного использования. Я попробовал подход "Менеджер", создавая dict и пытаясь встроить отсортированный список в Dict. В результате возникла ошибка (я полагаю, что это довольно ожидаемо, поскольку память, хранящаяся в отсортированном списке, не используется совместно и внедряет ее в "" общая "память объекта не будет работать"
Наконец-то... Вопрос:
- Есть ли способ, которым я могу поделиться своими SortedCollection и Weakref Dict с рабочим потоком
Альтернативное решение:
Как насчет того, чтобы сохранить рабочий поток простым... иметь запись в БД для каждого тика, а затем использовать сигнал post Db для получения уведомлений на главном и выполнять обработку таймеров с истекшим сроком на главном. Конечно, дело в том, что я теряю распараллеливание.
1 ответ
Давайте начнем с некоторых комментариев о вашей существующей реализации:
Обошел с оберткой вокруг Bisect для отсортированного списка дельты: http://code.activestate.com/recipes/577197-sortedcollection/
Хотя это дает вам O(1) всплывающих окон (пока вы сохраняете список в обратном порядке времени), он делает каждую вставку O(N) (и аналогично для менее распространенных операций, таких как удаление произвольных заданий, если у вас есть API "отмены")). Поскольку вы делаете столько же операций вставки, сколько всплывающих окон, это означает, что все это алгоритмически не лучше, чем несортированный список.
Заменив это на heapq
(это именно то, для чего они) дает вам O (журнал N) вставки. (Обратите внимание, что Python heapq
не имеет peek
, но это потому что heap[0]
эквивалентно heap.peek(0)
так что тебе это не нужно.)
Если вам нужно выполнить другие операции (отменить, выполнить итерацию без разрушения и т. Д.) O(log N), вам нужно дерево поиска; посмотри на blist
а также bintrees
на PyPI для некоторых хороших.
Пошёл с сельдереем, чтобы поставить галочку - Гранулярность 1 минута, когда рабочий вызвал функцию timer_tick, предоставленную моим классом Timer. По сути, timer_tick должен проходить через отсортированный список, уменьшая головной узел каждый тик. Затем, если какие-либо узлы опустятся до 0, начните обратный вызов и удалите эти узлы из отсортированного списка таймеров.
Гораздо приятнее просто сохранять целевое время вместо дельт. С целевым временем вам просто нужно сделать это:
while q.peek().timestamp <= now():
process(q.pop())
Опять же, это O(1), а не O(N), и это намного проще, и он рассматривает элементы в очереди как неизменяемые, и это позволяет избежать любых возможных проблем с итерациями, которые занимают больше времени, чем тик (вероятно, не проблема). с 1-минутными тиками…).
Теперь к вашему основному вопросу:
Есть ли способ, которым я могу поделиться своей SortedCollection
Да. Если вы просто хотите приоритетную кучу (timestamp, id)
пары, вы можете вписать это в multiprocessing.Array
так же легко, как list
За исключением необходимости явно отслеживать длину. Тогда вам просто нужно синхронизировать каждую операцию, и все.
Если вы тикаете раз в минуту и ожидаете, что будете заняты чаще, чем нет, вы можете просто использовать Lock
синхронизировать, и иметь график-работник (ы), отметьте себя.
Но, честно говоря, я бы полностью снял галочки и просто использовал Condition
- это более гибко и концептуально проще (даже если это немного больше кода), и это означает, что вы используете 0% ЦП, когда не нужно выполнять работу, и быстро и плавно реагируете, когда вы находитесь под нагрузкой. Например:
def schedule_job(timestamp, job):
job_id = add_job_to_shared_dict(job) # see below
with scheduler_condition:
scheduler_heap.push((timestamp, job))
scheduler_condition.notify_all()
def scheduler_worker_run_once():
with scheduler_condition:
while True:
top = scheduler_heap.peek()
if top is not None:
delay = top[0] - now()
if delay <= 0:
break
scheduler_condition.wait(delay)
else:
scheduler_condition.wait()
top = scheduler_heap.pop()
if top is not None:
job = pop_job_from_shared_dict(top[1])
process_job(job)
Во всяком случае, это приводит нас к слабому полному рабочих мест.
Поскольку уязвимый объект явно хранит ссылки на внутрипроцессные объекты, нет никакого смысла делиться им между процессами. То, что вы хотите сохранить, - это неизменяемые объекты, которые определяют, какими являются задания, а не сами изменяемые задания. Тогда это просто старый старый диктат.
Но, тем не менее, простой старый диктат - нелегкая вещь для всех процессов.
Самый простой способ сделать это - использовать dbm
база данных (или shelve
обертка вокруг одного) вместо в памяти dict
синхронизированы с Lock
, Но это означает повторную очистку и повторное открытие базы данных каждый раз, когда кто-либо хочет ее изменить, что может быть неприемлемо.
Переход, скажем, на базу данных sqlite3 может показаться излишним, но это может быть намного проще.
С другой стороны... единственные операции, которые у вас есть, - это "сопоставить следующий идентификатор с этим заданием и вернуть идентификатор" и "выдать и вернуть задание, указанное этим идентификатором". Это действительно должно быть диктом? Ключи являются целыми числами, и вы управляете ими. Array
плюс один Value
для следующего ключа и Lock
и вы почти закончили. Проблема в том, что вам нужна какая-то схема переполнения ключа. Вместо просто next_id += 1
, вам нужно перевернуться и проверить наличие уже используемых слотов:
with lock:
next_id += 1
if next_id == size: next_id = 0
if arr[next_id] is None:
arr[next_id] = job
return next_id
Другой вариант - просто сохранить dict в основном процессе и использовать Queue
чтобы другие процессы запрашивали его.