Python RQ - как запустить работу, когда несколько других работ завершены? Много работа зависимость зависимость вокруг?

У меня есть вложенная структура заданий в моей очереди на повторное редактирование Python. Сначала выполняется задание rncopy. После того, как это закончено, 3 зависимых регистрационных задания следуют. Когда вычисление всех этих трех заданий закончено, я хочу запустить задание, чтобы отправить уведомление веб-сокета моему веб-интерфейсу.

Моя текущая попытка:

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))

К сожалению, кажется, что функция зависимости от нескольких заданий никогда не была объединена с мастером. Я видел, что на данный момент есть два запроса на git. Есть ли обходной путь, который я могу использовать?

Извините, что не предоставил воспроизводимый пример.

0 ответов

Я использую этот обходной путь: если зависимости n, я создаю n-1 оберток реальной функции: каждая обертка зависит от другого задания.

Это решение немного невнятное, но оно работает.

rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)

notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)

def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
    queue = Queue('YOUR-QUEUE-NAME'))
    queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)

def second_wrapper(patient_finished, patientid,fla_reg_id):
    queue = Queue('YOUR-QUEUE-NAME'))
    queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)

Некоторые предостережения:

  • Я не передаю объект очереди оболочкам, потому что возникают некоторые проблемы с сериализацией; итак, очередь надо восстанавливать по имени...

  • По той же причине я передаю оберткам job.id (вместо объекта job).

Я создал "rq-manager" для решения подобных проблем с множественными и древовидными зависимостями: https://github.com/crispyDyne/rq-manager

Структура проекта с множественной зависимостью выглядит так.

def simpleTask(x):
    return 2*x
project = {'jobs':[
            {
                'blocking':True, # this job must finished before moving on.
                'func':simpleTask,'args': 0
            },
            {
                'blocking':True, # this job, and its child jobs, must finished before moving on.
                'jobs':[ # these child jobs will run in parallel
                    {'func':simpleTask,'args': 1},
                    {'func':simpleTask,'args': 2},
                    {'func':simpleTask,'args': 3}],
            },
            { # this job will only run when the blocking jobs above finish.
                'func':simpleTask,'args': 4
            }
        ]}

Затем передайте его менеджеру для завершения.

from rq_manager import manager, getProjectResults

managerJob = q.enqueue(manager,project)
projectResults = getProjectResults(managerJob)

возвращается

projectResults = [0, [2, 4, 6], 8]

Когда зависимые задания требуют результатов от родителя. Я создаю функцию, которая выполняет первое задание, а затем добавляет в проект дополнительные задания. Итак, для вашего примера:

def firstTask(patientid,imagepath):

    raw_nifti_result  = raw_nifti_copymachine(patientid,imagepath)

    moreTasks = {'jobs':[
        {'func':modality_registrator,'args':(patientid, "t1c", raw_nifti_result)},
        {'func':modality_registrator,'args':(patientid, "t2", raw_nifti_result)},
        {'func':modality_registrator,'args':(patientid, "fla", raw_nifti_result)},
    ]}

    # returning a dictionary with an "addJobs" will add those tasks to the project. 
    return {'result':raw_nifti_result, 'addJobs':moreTasks}

Проект будет выглядеть так:

project = {'jobs':[
            {'blocking':True, # this job, and its child jobs, must finished before moving on.
             'jobs':[
                {
                    'func':firstTask, 'args':(patientid, imagepath)
                    'blocking':True, # this job must finished before moving on.
                },
                # "moreTasks" will be added here
                ]
            }
            { # this job will only run when the blocking jobs above finish.
                'func':print,'args': (patient_finished, patientid)
            }
        ]}

Если финальному заданию требуются результаты предыдущих заданий, установите флаг "previousJobArgs". "finalJob" получит массив предыдущих результатов с вложенным массивом результатов его подзадачи.

def finalJob(previousResults):
    # previousResults = [ 
    #     raw_nifti_copymachine(patientid,imagepath),
    #     [
    #         modality_registrator(patientid, "t1c", raw_nifti_result),
    #         modality_registrator(patientid, "t2", raw_nifti_result),
    #         modality_registrator(patientid, "fla", raw_nifti_result),
    #     ]
    # ]
    return doSomethingWith(previousResults)

Тогда проект будет выглядеть так

project = {'jobs':[
            {
             #'blocking':True, # Blocking not needed.
             'jobs':[
                {
                    'func':firstTask, 'args':(patientid, imagepath)
                    'blocking':True, # this job must finished before moving on.
                },
                # "moreTasks" will be added here
                ]
            }
            { # This job will wait, since it needs the previous job's results. 
                'func':finalJob, 'previousJobArgs': True # it gets all the previous jobs results
            }
        ]}

Надеюсь, что https://github.com/rq/rq/issues/260 будет реализован, и мое решение будет устаревшим!

Другие вопросы по тегам