Может ли Луиджи распространить исключение или вернуть какой-либо результат?
Я использую Луиджи, чтобы запустить какой-то конвейер. Давайте возьмем простой пример
task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()
Теперь давайте скажем, что myTask
поднимает исключение во время исполнения. Все, что я могу получить - это журнал от Луиджи, показывающий исключение.
Есть ли способ, которым Луиджи может распространять это или, по крайней мере, вернуть failure
статус?
Тогда я смогу заставить мою программу реагировать в зависимости от этого состояния.
Благодарю.
РЕДАКТИРОВАТЬ Я забыл указать, что выходные данные luigi нацелены на базу данных, когда я сохраняю результат. Если возникает исключение, результат не сохраняется, но исключение не распространяется на luigi. Мне было интересно, есть ли у Луиджи возможность иметь это.
2 ответа
Из документов:
Luigi имеет встроенную систему событий, которая позволяет вам регистрировать обратные вызовы к событиям и запускать их из ваших собственных задач. Вы можете подключиться к некоторым заранее заданным событиям и создать свои собственные. Каждый дескриптор события привязан к классу Task и будет запускаться только из этого класса или его подкласса. Это позволяет без особых усилий подписываться на события только из определенного класса (например, для заданий hadoop).
Пример:
import luigi
from my_tasks import MyTask
@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
"""Will be called directly after a failed execution
of `run` on any MyTask subclass
"""
do_something()
luigi.run()
У Луиджи есть много событий, из которых вы можете выбрать. Вы также можете взглянуть на эти тесты, чтобы научиться слушать и реагировать на другие события.
Что вы можете сделать, это записать ошибку в файл. Например, в вашей задаче может произойти сбой (назовем это TaskA):
x=""
try:
do stuff
except:
x="error!"
with open('errorfile.log','w') as f:
f.write(x)
Затем в задаче, которая зависит от этой ошибки, для этой задачи потребуется TaskA. И вы могли бы сделать что-то вроде этого:
with open('errorfile.log','r') as f:
if f.read()://if anything is in the error log from TaskA
//error occurred
do stuff
else:
do other stuff