Потребитель / Производитель "своевременная" очередь
Я реализовал очередь приоритетов потребителя / производителя, где приоритетом фактически является отметка времени, представляющая, когда элемент должен быть доставлен. Это работает довольно хорошо, но я хотел бы знать, есть ли у кого-нибудь лучшая идея для реализации этого или комментарии о текущей реализации.
Код в Python. Создан единый поток, чтобы вовремя разбудить ожидающих потребителей. Я знаю, что это анти-шаблон для создания потока в библиотеке, но я не мог придумать другой метод.
Вот код:
import collections
import heapq
import threading
import time
class TimelyQueue(threading.Thread):
"""
Implements a similar but stripped down interface of Queue which
delivers items on time only.
"""
class Locker:
def __init__(self, lock):
self.l = lock
def __enter__(self):
self.l.acquire()
return self.l
def __exit__(self, type, value, traceback):
self.l.release()
# Optimization to avoid wasting CPU cycles when something
# is about to happen in less than 5 ms.
_RESOLUTION = 0.005
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.queue = []
self.triggered = collections.deque()
self.putcond = threading.Condition()
self.getcond = threading.Condition()
# Optimization to avoid waking the thread uselessly.
self.putwaketime = 0
def put(self, when, item):
with self.Locker(self.putcond):
heapq.heappush(self.queue, (when, item))
if when < self.putwaketime or self.putwaketime == 0:
self.putcond.notify()
def get(self, timeout=None):
with self.Locker(self.getcond):
if len(self.triggered) > 0:
when, item = self.triggered.popleft()
return item
self.getcond.wait(timeout)
try:
when, item = self.triggered.popleft()
except IndexError:
return None
return item
def qsize(self):
with self.Locker(self.putcond):
return len(self.queue)
def run(self):
with self.Locker(self.putcond):
maxwait = None
while True:
curtime = time.time()
try:
when, item = self.queue[0]
maxwait = when - curtime
self.putwaketime = when
except IndexError:
maxwait = None
self.putwaketime = 0
self.putcond.wait(maxwait)
curtime = time.time()
while True:
# Don't dequeue now, we are not sure to use it yet.
try:
when, item = self.queue[0]
except IndexError:
break
if when > curtime + self._RESOLUTION:
break
self.triggered.append(heapq.heappop(self.queue))
if len(self.triggered) > 0:
with self.Locker(self.getcond):
self.getcond.notify()
if __name__ == "__main__":
q = TimelyQueue()
q.start()
N = 50000
t0 = time.time()
for i in range(N):
q.put(time.time() + 2, i)
dt = time.time() - t0
print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
t0 = time.time()
i = 0
while i < N:
a = q.get(3)
if i == 0:
dt = time.time() - t0
print "start get after %.3fs" % dt
t0 = time.time()
i += 1
dt = time.time() - t0
print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
2 ответа
Для записи, я реализовал то, что вы предложили, используя фабрику таймеров. Я запустил небольшой тест, используя версию выше, а новую версию, используя threading.Timer
учебный класс:
Первая реализация
С разрешением по умолчанию (5 мс, то есть все в пределах 5 мс окна запускается вместе), он достигает около 88k
put()
/ сек и 69кget()
/ Сек.С разрешением 0 мс (без оптимизации) он достигает около 88k
put()
/ сек и 55кget()
/ Сек.
Вторая реализация
С разрешением по умолчанию (5 мс) он достигает около 88k
put()
/ сек и 65кget()
/ Сек.При разрешении 0 мс он достигает около 88k
put()
/ сек и 62кget()
/ Сек.
Признаюсь, я удивлен, что вторая реализация быстрее без оптимизации разрешения. Сейчас слишком поздно для расследования.
import collections
import heapq
import threading
import time
class TimelyQueue:
"""
Implements a similar but stripped down interface of Queue which
delivers items on time only.
"""
def __init__(self, resolution=5, timerfactory=threading.Timer):
"""
`resolution' is an optimization to avoid wasting CPU cycles when
something is about to happen in less than X ms.
"""
self.resolution = float(resolution) / 1000
self.timerfactory = timerfactory
self.queue = []
self.triggered = collections.deque()
self.putcond = threading.Condition()
self.getcond = threading.Condition()
# Optimization to avoid waking the thread uselessly.
self.putwaketime = 0
self.timer = None
self.terminating = False
def __arm(self):
"""
Arm the next timer; putcond must be acquired!
"""
curtime = time.time()
when, item = self.queue[0]
interval = when - curtime
self.putwaketime = when
self.timer = self.timerfactory(interval, self.__fire)
self.timer.start()
def __fire(self):
with self.putcond:
curtime = time.time()
debug = 0
while True:
# Don't dequeue now, we are not sure to use it yet.
try:
when, item = self.queue[0]
except IndexError:
break
if when > curtime + self.resolution:
break
debug += 1
self.triggered.append(heapq.heappop(self.queue))
if len(self.triggered) > 0:
with self.getcond:
self.getcond.notify(len(self.triggered))
if self.terminating:
return
if len(self.queue) > 0:
self.__arm()
def put(self, when, item):
"""
`when' is a Unix time from Epoch.
"""
with self.putcond:
heapq.heappush(self.queue, (when, item))
if when >= self.putwaketime and self.putwaketime != 0:
return
# Arm next timer.
if self.timer is not None:
self.timer.cancel()
self.__arm()
def get(self, timeout=None):
"""
Timely return the next object on the queue.
"""
with self.getcond:
if len(self.triggered) > 0:
when, item = self.triggered.popleft()
return item
self.getcond.wait(timeout)
try:
when, item = self.triggered.popleft()
except IndexError:
return None
return item
def qsize(self):
"""
Self explanatory.
"""
with self.putcond:
return len(self.queue)
def terminate(self):
"""
Request the embedded thread to terminate.
"""
with self.putcond:
self.terminating = True
if self.timer is not None:
self.timer.cancel()
self.putcond.notifyAll()
if __name__ == "__main__":
q = TimelyQueue(0)
N = 100000
t0 = time.time()
for i in range(N):
q.put(time.time() + 2, i)
dt = time.time() - t0
print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
t0 = time.time()
i = 0
while i < N:
a = q.get(3)
if i == 0:
dt = time.time() - t0
print "start get after %.3fs" % dt
t0 = time.time()
i += 1
dt = time.time() - t0
print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
q.terminate()
# Give change to the thread to exit properly, otherwise we may get
# a stray interpreter exception.
time.sleep(0.1)
Единственное, для чего вам действительно нужен фоновый поток - это таймер, чтобы пнуть официантов, когда он закончится, верно?
Во-первых, вы можете реализовать это с threading.Timer
вместо явного фонового потока. Но, хотя это может быть проще, на самом деле это не решит проблему создания потока за спиной пользователя, хотят ли они этого или нет. Кроме того, с threading.Timer
вы фактически отключаете новый поток каждый раз, когда перезапускаете таймер, что может быть проблемой производительности. (У вас есть только один за раз, но тем не менее запуск и остановка потоков не бесплатны.)
Если вы посмотрите на модули PyPI, рецепты ActiveState и различные инфраструктуры, существует много реализаций, которые позволяют запускать несколько таймеров в одном фоновом потоке. Это решило бы вашу проблему.
Но это все еще не идеальное решение. Например, скажем, моему приложению нужно 20 TimelyQueue
объекты - или TimelyQueue
плюс 19 других вещей, которым всем нужны таймеры. Я бы все равно закончил с 20 нитями. Или, скажем, я создаю сервер сокетов или приложение с графическим интерфейсом (два наиболее очевидных варианта использования для вашего TimelyQueue
; Я могу реализовать таймер поверх моего цикла событий (или, скорее всего, просто использовать таймер, который поставляется с платформой), так зачем мне вообще нужен поток?
Выход из этого состоит в том, чтобы предложить крючок для поставки любой фабрики таймеров:
def __init__(self, timerfactory = threading.Timer):
self.timerfactory = timerfactory
...
Теперь, когда вам нужно настроить таймер:
if when < self.waketime:
self.timer.cancel()
self.timer = self.timerfactory(when - now(), self.timercallback)
self.waketime = when
Для быстрых и грязных случаев использования этого было бы достаточно из коробки. Но если я, например, использую twisted
Я могу просто использовать TimelyQueue(twisted.reactor.callLater)
и теперь таймеры очереди проходят через twisted
цикл событий. Или, если у меня есть реализация с несколькими таймерами в одном потоке, которую я использую в другом месте, TimelyQueue(multiTimer.add)
и теперь таймеры очереди идут в том же потоке, что и все мои другие таймеры.
Если вы хотите, вы можете поставить лучше, чем по умолчанию, чем threading.Timer
но на самом деле, я думаю, что большинство людей, которые нуждаются в чем-то лучше, чем threading.Timer
сможет предоставить что-то, что лучше для их конкретного приложения, чем то, что вы предоставляете.
Конечно, не каждая реализация таймера имеет тот же API, что и threading.Timer
- хотя вы будете удивлены, сколько из них делают. Но это не так сложно написать адаптер, если у вас есть таймер, который вы хотите использовать с TimelyQueue
но у него неправильный интерфейс. Например, если я создаю приложение PyQt4/PySide, QTimer
не имеет cancel
метод, и занимает мс вместо секунд, поэтому мне придется сделать что-то вроде этого:
class AdaptedQTimer(object):
def __init__(self, timeout, callback):
self.timer = QTimer.singleShot(timeout * 1000, callback)
def cancel(self):
self.timer.stop()
q = TimelyQueue(AdaptedQTimer)
Или, если я хочу интегрировать очередь в QObject
более прямо, я мог бы обернуть QObject.startTimer()
и мой timerEvent(self)
вызов метода обратного вызова.
Как только вы рассматриваете адаптеры, еще одна идея. Я не думаю, что это того стоит, но стоит подумать. Если ваш таймер взял метку времени, а не timedelta, и имеет adjust
метод, а не / вместо cancel
и провел свой собственный waketime
, ваш TimelyQueue
реализация может быть проще и, возможно, более эффективной. В put
, у вас есть что-то вроде этого:
if self.timer is None:
self.timer = self.timerfactory(when)
elif when < self.timer.waketime:
self.timer.adjust(when)
Конечно, большинство таймеров не предоставляют этот интерфейс. Но если кто-то имеет его или хочет его создать, он может получить выгоду. А для всех остальных вы можете предоставить простой адаптер, который превращает threading.Timer
Стиль таймера в том виде, который вам нужен, что-то вроде:
def timerFactoryAdapter(threadingStyleTimerFactory):
class TimerFactory(object):
def __init__(self, timestamp, callback):
self.timer = threadingStyleTimerFactory(timestamp - now(), callback)
self.callback = callback
def cancel(self):
return self.timer.cancel()
def adjust(self, timestamp):
self.timer.cancel()
self.timer = threadingStyleTimerFactory(timestamp - now(), self.callback)