Потребитель / Производитель "своевременная" очередь

Я реализовал очередь приоритетов потребителя / производителя, где приоритетом фактически является отметка времени, представляющая, когда элемент должен быть доставлен. Это работает довольно хорошо, но я хотел бы знать, есть ли у кого-нибудь лучшая идея для реализации этого или комментарии о текущей реализации.

Код в Python. Создан единый поток, чтобы вовремя разбудить ожидающих потребителей. Я знаю, что это анти-шаблон для создания потока в библиотеке, но я не мог придумать другой метод.

Вот код:

import collections
import heapq
import threading
import time

class TimelyQueue(threading.Thread):
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    class Locker:
        def __init__(self, lock):
            self.l = lock
        def __enter__(self):
            self.l.acquire()
            return self.l
        def __exit__(self, type, value, traceback):
            self.l.release()

    # Optimization to avoid wasting CPU cycles when something
    # is about to happen in less than 5 ms.
    _RESOLUTION = 0.005

    def __init__(self):
        threading.Thread.__init__(self)
        self.daemon = True
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0

    def put(self, when, item):
        with self.Locker(self.putcond):
            heapq.heappush(self.queue, (when, item))
            if when < self.putwaketime or self.putwaketime == 0:
                self.putcond.notify()

    def get(self, timeout=None):
        with self.Locker(self.getcond):
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
                self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        with self.Locker(self.putcond):
            return len(self.queue)

    def run(self):
        with self.Locker(self.putcond):
            maxwait = None
            while True:
                curtime = time.time()
                try:
                    when, item = self.queue[0]
                    maxwait = when - curtime
                    self.putwaketime = when
                except IndexError:
                    maxwait = None
                    self.putwaketime = 0
                self.putcond.wait(maxwait)

                curtime = time.time()
                while True:
                    # Don't dequeue now, we are not sure to use it yet.
                    try:
                        when, item = self.queue[0]
                    except IndexError:
                        break
                    if when > curtime + self._RESOLUTION:
                        break

                    self.triggered.append(heapq.heappop(self.queue))
                if len(self.triggered) > 0:
                    with self.Locker(self.getcond):
                        self.getcond.notify()


if __name__ == "__main__":
    q = TimelyQueue()
    q.start()

    N = 50000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)

2 ответа

Для записи, я реализовал то, что вы предложили, используя фабрику таймеров. Я запустил небольшой тест, используя версию выше, а новую версию, используя threading.Timer учебный класс:

  1. Первая реализация

    • С разрешением по умолчанию (5 мс, то есть все в пределах 5 мс окна запускается вместе), он достигает около 88k put()/ сек и 69к get()/ Сек.

    • С разрешением 0 мс (без оптимизации) он достигает около 88k put()/ сек и 55к get()/ Сек.

  2. Вторая реализация

    • С разрешением по умолчанию (5 мс) он достигает около 88k put()/ сек и 65к get()/ Сек.

    • При разрешении 0 мс он достигает около 88k put()/ сек и 62к get()/ Сек.

Признаюсь, я удивлен, что вторая реализация быстрее без оптимизации разрешения. Сейчас слишком поздно для расследования.

import collections
import heapq
import threading
import time

class TimelyQueue:
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    def __init__(self, resolution=5, timerfactory=threading.Timer):
        """
        `resolution' is an optimization to avoid wasting CPU cycles when
        something is about to happen in less than X ms.
        """
        self.resolution = float(resolution) / 1000
        self.timerfactory = timerfactory
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0
        self.timer = None
        self.terminating = False

    def __arm(self):
        """
        Arm the next timer; putcond must be acquired!
        """
        curtime = time.time()
        when, item = self.queue[0]
        interval = when - curtime
        self.putwaketime = when
        self.timer = self.timerfactory(interval, self.__fire)
        self.timer.start()

    def __fire(self):
        with self.putcond:
            curtime = time.time()
            debug = 0
            while True:
                # Don't dequeue now, we are not sure to use it yet.
                try:
                    when, item = self.queue[0]
                except IndexError:
                    break
                if when > curtime + self.resolution:
                    break

                debug += 1
                self.triggered.append(heapq.heappop(self.queue))
            if len(self.triggered) > 0:
                with self.getcond:
                    self.getcond.notify(len(self.triggered))
            if self.terminating:
                return
            if len(self.queue) > 0:
                self.__arm()

    def put(self, when, item):
        """
        `when' is a Unix time from Epoch.
        """
        with self.putcond:
            heapq.heappush(self.queue, (when, item))
            if when >= self.putwaketime and self.putwaketime != 0:
                return
            # Arm next timer.
            if self.timer is not None:
                self.timer.cancel()
            self.__arm()

    def get(self, timeout=None):
        """
        Timely return the next object on the queue.
        """
        with self.getcond:
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
            self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        """
        Self explanatory.
        """
        with self.putcond:
            return len(self.queue)

    def terminate(self):
        """
        Request the embedded thread to terminate.
        """
        with self.putcond:
            self.terminating = True
            if self.timer is not None:
                self.timer.cancel()
            self.putcond.notifyAll()


if __name__ == "__main__":
    q = TimelyQueue(0)
    N = 100000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
    q.terminate()
    # Give change to the thread to exit properly, otherwise we may get
    # a stray interpreter exception.
    time.sleep(0.1)

Единственное, для чего вам действительно нужен фоновый поток - это таймер, чтобы пнуть официантов, когда он закончится, верно?

Во-первых, вы можете реализовать это с threading.Timer вместо явного фонового потока. Но, хотя это может быть проще, на самом деле это не решит проблему создания потока за спиной пользователя, хотят ли они этого или нет. Кроме того, с threading.Timer вы фактически отключаете новый поток каждый раз, когда перезапускаете таймер, что может быть проблемой производительности. (У вас есть только один за раз, но тем не менее запуск и остановка потоков не бесплатны.)

Если вы посмотрите на модули PyPI, рецепты ActiveState и различные инфраструктуры, существует много реализаций, которые позволяют запускать несколько таймеров в одном фоновом потоке. Это решило бы вашу проблему.

Но это все еще не идеальное решение. Например, скажем, моему приложению нужно 20 TimelyQueue объекты - или TimelyQueue плюс 19 других вещей, которым всем нужны таймеры. Я бы все равно закончил с 20 нитями. Или, скажем, я создаю сервер сокетов или приложение с графическим интерфейсом (два наиболее очевидных варианта использования для вашего TimelyQueue; Я могу реализовать таймер поверх моего цикла событий (или, скорее всего, просто использовать таймер, который поставляется с платформой), так зачем мне вообще нужен поток?

Выход из этого состоит в том, чтобы предложить крючок для поставки любой фабрики таймеров:

def __init__(self, timerfactory = threading.Timer):
    self.timerfactory = timerfactory
    ...

Теперь, когда вам нужно настроить таймер:

if when < self.waketime:
    self.timer.cancel()
    self.timer = self.timerfactory(when - now(), self.timercallback)
    self.waketime = when

Для быстрых и грязных случаев использования этого было бы достаточно из коробки. Но если я, например, использую twisted Я могу просто использовать TimelyQueue(twisted.reactor.callLater) и теперь таймеры очереди проходят через twisted цикл событий. Или, если у меня есть реализация с несколькими таймерами в одном потоке, которую я использую в другом месте, TimelyQueue(multiTimer.add) и теперь таймеры очереди идут в том же потоке, что и все мои другие таймеры.

Если вы хотите, вы можете поставить лучше, чем по умолчанию, чем threading.Timer но на самом деле, я думаю, что большинство людей, которые нуждаются в чем-то лучше, чем threading.Timer сможет предоставить что-то, что лучше для их конкретного приложения, чем то, что вы предоставляете.

Конечно, не каждая реализация таймера имеет тот же API, что и threading.Timer - хотя вы будете удивлены, сколько из них делают. Но это не так сложно написать адаптер, если у вас есть таймер, который вы хотите использовать с TimelyQueue но у него неправильный интерфейс. Например, если я создаю приложение PyQt4/PySide, QTimer не имеет cancel метод, и занимает мс вместо секунд, поэтому мне придется сделать что-то вроде этого:

class AdaptedQTimer(object):
    def __init__(self, timeout, callback):
        self.timer = QTimer.singleShot(timeout * 1000, callback)
    def cancel(self):
        self.timer.stop()

q = TimelyQueue(AdaptedQTimer)

Или, если я хочу интегрировать очередь в QObject более прямо, я мог бы обернуть QObject.startTimer() и мой timerEvent(self) вызов метода обратного вызова.

Как только вы рассматриваете адаптеры, еще одна идея. Я не думаю, что это того стоит, но стоит подумать. Если ваш таймер взял метку времени, а не timedelta, и имеет adjust метод, а не / вместо cancel и провел свой собственный waketime, ваш TimelyQueue реализация может быть проще и, возможно, более эффективной. В put, у вас есть что-то вроде этого:

if self.timer is None:
    self.timer = self.timerfactory(when)
elif when < self.timer.waketime:
    self.timer.adjust(when)

Конечно, большинство таймеров не предоставляют этот интерфейс. Но если кто-то имеет его или хочет его создать, он может получить выгоду. А для всех остальных вы можете предоставить простой адаптер, который превращает threading.Timer Стиль таймера в том виде, который вам нужен, что-то вроде:

def timerFactoryAdapter(threadingStyleTimerFactory):
    class TimerFactory(object):
        def __init__(self, timestamp, callback):
            self.timer = threadingStyleTimerFactory(timestamp - now(), callback)
            self.callback = callback
        def cancel(self):
            return self.timer.cancel()
        def adjust(self, timestamp):
            self.timer.cancel()
            self.timer = threadingStyleTimerFactory(timestamp - now(), self.callback)
Другие вопросы по тегам