Асинхронные события для рабочего процесса для приложения Django

В моем веб-приложении Django у меня есть рабочая программа, которая является клиентом API с ограниченной скоростью и отвечает за обработку всех запросов к этому API из потоков моего сервера. Я использую свою базу данных для хранения очереди задач. Задачи могут приходить большими группами или вообще не быть. Я использую цикл обработки событий для опроса очереди и управления задержкой между задачами в случае превышения ограничения скорости (ограничение является динамическим). Это все работает нормально, но единственное, что я хочу сделать, - это дать работнику возможность перестать работать с базой данных, если очередь иссякает, и чтобы мое приложение Django сообщило работнику о том, что очередь больше не существует. снова высохнуть.

Схематично в псевдо-Python это выглядит так:

state = NORMAL
delay_time = NORMAL_DELAY

while True:
    sleep(delay_time)

    if state == DORMANT:
        continue

    task = get_next_task() # hits database
    if task is None:
        state = DORMANT
        delay_time = NORMAL_TIME

    try:
        execute(task)
    except RateExceeded:
        delay_time = backoff(delay_time)
    else:
        delay_time = NORMAL_DELAY

# Triggered by web layer
def asynchronous_event():
    state = NORMAL

И я либо хочу, чтобы асинхронное событие, инициированное веб-слоем, могло вернуть состояние в состояние NORMAL (которое будет выполняться во время sleep) или какая-то другая легкая проверка, которая не добавит ненужных зацикленных запросов к БД.

В установке с одним компьютером я мог бы просто использовать сигналы, но, очевидно, это не работает в установке с несколькими машинами. Я пытаюсь не запускать отдельный сервер очереди сообщений только для цели этого одного сигнала. Я размещен на Dotcloud, на случай, если это сыграет роль в работе сетевых решений. В идеале, что-то более или менее эквивалентное в простоте реализации в качестве обработчика сигнала. Я посмотрел в ZeroRPC, но я не уверен, как включить его в мой цикл событий.

Есть идеи?

редактировать

Я смотрю в ZeroMQ, чтобы решить эту проблему, но я мог бы использовать некоторую помощь. Сложность состоит в том, что будет несколько одновременных экземпляров веб-сервера, и при повторном развертывании мне необходимо обеспечить плавный переход от одного работника к его преемнику. Итак, потерпите меня, потому что моя терминология, вероятно, неверна, и мне кажется, что лучше всего сделать, чтобы каждый работник асинхронно связывался с адресом в качестве почтового ящика, который проверяется в основном цикле, чтобы выйти из спящего режима, Каждый работник создает в базе данных запись своего IP-адреса с отметкой времени создания. При отправке запроса веб-сервер публикует сообщение для всех работников. Когда работник получает сообщение, он проверяет, является ли работник рабочим с последней датой создания: если это так, он обрабатывает сообщение, а если нет, он завершает себя.

Это кажется большой проблемой, но я хочу понять это правильно, потому что, скорее всего, я буду использовать эту парадигму в другом месте моего приложения.

2 ответа

Решение

Как оказалось, я решил просто попасть в базу данных каждый цикл. Но я также решил, что ZeroMQ - это путь, если я когда-либо захочу приложить усилия, чтобы сделать это действительно эффективным. Вот как это работает:

Каждый работник связывает сокет абонента ZeroMQ и регистрируется в базе данных работников, которая содержит IP-адрес и порт сокета. Веб-темы публикуют DO_TASK сообщение для последнего зарегистрированного работника и QUIT сообщения для любых других, которые могут работать.

Я внедряю Dotcloud, и их поддержка говорит, что использование переменных среды пользовательского сервиса и параметров сборки позволит мне открыть необходимые порты и получить IP-адреса экземпляров рабочих задач.

Как насчет применения экспоненциального отката к рабочему месту delay_time если нет задачи в базе данных? Это может значительно снизить нагрузку на БД без дополнительной сложности передачи сообщений из веб-приложения на рабочую работу. Что-то вроде:

delay_time = NORMAL_DELAY

while True:
    sleep(delay_time)
    task = get_next_task() # hits database

    if task:
        try:
            execute(task)
        except RateExceeded:
            pass
        else:
            delay_time = NORMAL_DELAY
            continue

     delay_time = backoff(delay_time)
Другие вопросы по тегам