Изменение рабочих задач python-gearman во время обработки задания

Я пытаюсь изменить задачи, доступные на работнике python-gearman во время его рабочего цикла. Моя причина для этого - дать мне немного контроля над моими рабочими процессами и позволить им перезагрузиться из базы данных. Мне нужно, чтобы каждый работник перезагружался через регулярные промежутки времени, но я не хочу просто убивать процессы, и я хочу, чтобы служба была постоянно доступна, что означает, что мне нужно перезагружать пакетами. Таким образом, у меня будет 4 перезагружаемых рабочих, в то время как еще 4 рабочих доступны для обработки, а затем перезагрузит следующих 4 рабочих.

Процесс:

  1. Начните процесс перезагрузки 4 раза.
    1. отменить регистрацию reload процесс
    2. перезагрузить набор данных
    3. зарегистрировать finishReload задача
    4. вернуть
  2. Повторите шаг 1, пока не будет рабочих с reload задание зарегистрировано
  3. Начните finishReload(1) задание, пока нет рабочих с finishReload задание доступно.

(1) задача finishReload отменяет регистрацию finishReload задание и регистрирует reload задание, а затем возвращается.

Теперь проблема, с которой я сталкиваюсь, заключается в том, что задание не выполняется, когда я изменяю задачи, доступные для рабочего процесса. Там нет сообщений об ошибках или исключений, просто "ОШИБКА" в журнале передач. Вот быстрая программа, которая копирует проблему.

РАБОЧИЙ

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

КЛИЕНТ

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

Пожалуйста, дайте мне знать, если есть что-то, что я могу объяснить.

РЕДАКТИРОВАТЬ: Я знаю, что кто-то попросит посмотреть журнал я упоминал. Я также отправил этот вопрос в группу gearman в Google, и там есть журнал.

2 ответа

Решение

Это похоже на создание подкласса класса GearmanWorker, и добавление нескольких флагов может обойти эту проблему. Мне нужно разрешить завершить работу, прежде чем я начну вводить новые команды от рабочего к серверу, что, кажется, прерывает текущую работу. Так что, если мы перезаписать on_job_complete функция, которую мы можем проверить на флаг включения / выключения и действовать на них после того, как мы выпустим send_job_complete команда. Новая рабочая программа выглядит следующим образом:

РАБОЧИЙ

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 

На первый взгляд кажется, что проблема заключается в том, что вы начинаете работу, а затем отменяете регистрацию способности рабочих выполнять эту работу с сервера заданий до ее завершения.

Другие вопросы по тегам