Изменение рабочих задач python-gearman во время обработки задания
Я пытаюсь изменить задачи, доступные на работнике python-gearman во время его рабочего цикла. Моя причина для этого - дать мне немного контроля над моими рабочими процессами и позволить им перезагрузиться из базы данных. Мне нужно, чтобы каждый работник перезагружался через регулярные промежутки времени, но я не хочу просто убивать процессы, и я хочу, чтобы служба была постоянно доступна, что означает, что мне нужно перезагружать пакетами. Таким образом, у меня будет 4 перезагружаемых рабочих, в то время как еще 4 рабочих доступны для обработки, а затем перезагрузит следующих 4 рабочих.
Процесс:
- Начните процесс перезагрузки 4 раза.
- отменить регистрацию
reload
процесс - перезагрузить набор данных
- зарегистрировать
finishReload
задача - вернуть
- отменить регистрацию
- Повторите шаг 1, пока не будет рабочих с
reload
задание зарегистрировано - Начните
finishReload
(1) задание, пока нет рабочих сfinishReload
задание доступно.
(1) задача finishReload отменяет регистрацию finishReload
задание и регистрирует reload
задание, а затем возвращается.
Теперь проблема, с которой я сталкиваюсь, заключается в том, что задание не выполняется, когда я изменяю задачи, доступные для рабочего процесса. Там нет сообщений об ошибках или исключений, просто "ОШИБКА" в журнале передач. Вот быстрая программа, которая копирует проблему.
РАБОЧИЙ
import gearman
def reversify(gmWorker, gmJob):
return "".join(gmJob.data[::-1])
def strcount(gmWorker, gmJob):
gmWorker.unregister_task('reversify') # problem line
return str(len(gmJob.data))
worker = gearman.GearmanWorker(['localhost:4730'])
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
while True:
worker.work()
КЛИЕНТ
import gearman
client = gearman.GearmanClient(['localhost:4730'])
a = client.submit_job('reversify', 'spam and eggs')
print a.result
>>> sgge dna maps
a = client.submit_job('strcount', 'spam and eggs')
...
Пожалуйста, дайте мне знать, если есть что-то, что я могу объяснить.
РЕДАКТИРОВАТЬ: Я знаю, что кто-то попросит посмотреть журнал я упоминал. Я также отправил этот вопрос в группу gearman в Google, и там есть журнал.
2 ответа
Это похоже на создание подкласса класса GearmanWorker, и добавление нескольких флагов может обойти эту проблему. Мне нужно разрешить завершить работу, прежде чем я начну вводить новые команды от рабочего к серверу, что, кажется, прерывает текущую работу. Так что, если мы перезаписать on_job_complete
функция, которую мы можем проверить на флаг включения / выключения и действовать на них после того, как мы выпустим send_job_complete
команда. Новая рабочая программа выглядит следующим образом:
РАБОЧИЙ
import gearman
def reversify(gmWorker, gmJob):
return "".join(gmJob.data[::-1])
def enable_reversify(gmWorker, gmJob):
myWorker.enableReversify = 1
return 'OK'
def strcount(gmWorker, gmJob):
myWorker.enableReversify = -1
return str(len(gmJob.data))
class myWorker(gearman.GearmanWorker):
enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on
def on_job_complete(self, current_job, job_result):
self.send_job_complete(current_job, job_result)
### check the flag here and enable or disable tasks ###
if myWorker.enableReversify == -1:
self.unregister_task('reversify')
if myWorker.enableReversify == 1:
self.register_task('reversify', reversify)
myWorker.enableReversify = 0 # reset the flag
return True
worker = myWorker(['localhost:4730'])
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)
while True:
worker.work()
На первый взгляд кажется, что проблема заключается в том, что вы начинаете работу, а затем отменяете регистрацию способности рабочих выполнять эту работу с сервера заданий до ее завершения.