Может ли Луиджи распространить исключение или вернуть какой-либо результат?

Я использую Луиджи, чтобы запустить какой-то конвейер. Давайте возьмем простой пример

task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()

Теперь давайте скажем, что myTask поднимает исключение во время исполнения. Все, что я могу получить - это журнал от Луиджи, показывающий исключение.

Есть ли способ, которым Луиджи может распространять это или, по крайней мере, вернуть failure статус?

Тогда я смогу заставить мою программу реагировать в зависимости от этого состояния.

Благодарю.

РЕДАКТИРОВАТЬ Я забыл указать, что выходные данные luigi нацелены на базу данных, когда я сохраняю результат. Если возникает исключение, результат не сохраняется, но исключение не распространяется на luigi. Мне было интересно, есть ли у Луиджи возможность иметь это.

2 ответа

Решение

Из документов:

Luigi имеет встроенную систему событий, которая позволяет вам регистрировать обратные вызовы к событиям и запускать их из ваших собственных задач. Вы можете подключиться к некоторым заранее заданным событиям и создать свои собственные. Каждый дескриптор события привязан к классу Task и будет запускаться только из этого класса или его подкласса. Это позволяет без особых усилий подписываться на события только из определенного класса (например, для заданий hadoop).

Пример:

import luigi

from my_tasks import MyTask


@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """

    do_something()


luigi.run()

У Луиджи есть много событий, из которых вы можете выбрать. Вы также можете взглянуть на эти тесты, чтобы научиться слушать и реагировать на другие события.

Что вы можете сделать, это записать ошибку в файл. Например, в вашей задаче может произойти сбой (назовем это TaskA):

x=""
try:
    do stuff
except:
    x="error!"
with open('errorfile.log','w') as f:
    f.write(x)

Затем в задаче, которая зависит от этой ошибки, для этой задачи потребуется TaskA. И вы могли бы сделать что-то вроде этого:

with open('errorfile.log','r') as f:
    if f.read()://if anything is in the error log from TaskA
        //error occurred
        do stuff
    else:
        do other stuff
Другие вопросы по тегам