Гибкий трубопровод Luigi и параметры прохождения
Недавно я внедрил конвейер Луиджи для обработки одного из наших конвейеров биоинформатики. Однако в настройке этих задач есть кое-что фундаментальное, чего я не понимаю.
Допустим, у меня есть цепочка из трех задач, которые я хотел бы выполнять с несколькими работниками. Например, график зависимости для трех работников может выглядеть так:
/ TaskC -> TaskB -> TaskA
- TaskC -> TaskB -> TaskA
\ taskC -> taskB -> taskA
и я мог бы написать
class entry(luigi.Task):
in_dir = luigi.Parameter()
def requires(self):
for f in self.in_dir:
yield taskC(pass_through=f)
def run(self):
some logic using self.input().path
from each worker in the above yield
class taskA(luigi.Task):
in_file_A = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outA.txt')
def run(self):
some logic generating outA.txt
class taskB(luigi.Task):
pass_through = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outB.txt')
def requires(self):
return taskA(in_file_A=self.pass_through)
def run(self):
some logic using self.input().path [outA.txt]
and generating self.output().path [outB.txt]
class taskC(luigi.Task):
pass_through = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outC.txt')
def requires(self):
return taskB(pass_through=self.pass_through)
def run(self):
some logic using self.input().path [outB.txt]
and generating self.output().path [outC.txt]
Если мой код живет в pipeline.py
Я мог бы запустить это с:
luigi --module pipeline entry --workers 3 --in-dir some_dir_w_input_files/
Тот факт, что я отправляю параметр pass_through
вплоть до taskA
не похоже на правильный подход. Кроме того, если когда-нибудь в будущем у меня уже есть данные, сгенерированные (отдельно) taskA
, taskB
не достаточно гибок, чтобы справиться с этой ситуацией. Возможно, я мог бы написать:
class taskB(luigi.Task):
in_file_B = luigi.Parameter() # if we already have the output of taskA
pass_through = luigi.Parameter() # if we require taskA
def output(self):
return luigi.LocalTarget('outB.txt')
def requires(self):
if self.pass_through:
return taskA(in_file_A=self.pass_through)
def run(self):
if self.input().path:
logic_input = self.input().path
else:
logic_input = self.in_file_B
some logic using 'logic_input'
and generating self.output().path [outB.txt]
Я хотел бы знать, является ли это "правильным" шаблоном дизайна для Луиджи или я совершенно не в себе.
1 ответ
Я думаю, что это в значительной степени является артефактом абстрактных задач, которые у вас здесь, в реальном мире, вам, вероятно, нужно знать в каждом месте, где вы читаете / пишете. Смотрите, например:
class DecompressTask(luigi.Task):
dirname = luigi.Parameter()
filename = luigi.Parameter()
def output(self):
return luigi.LocalTarget(os.path.join(self.dirname , self.filename + ".txt"))
def run(self):
decompress(os.path.join(self.dirname, self.filename + ".gz"),
os.path.join(self.dirname, self.filename + ".txt"))
class TranslateTask(luigi.Task):
dirname = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
return DecompressTask(dirname=self.dirname, filename=self.filename)
def output(self):
return luigi.LocalTarget(os.path.join(self.dirname + self.filename + ".translated"))
def run(self):
translate(os.path.join(self.dirname, self.filename + ".txt"),
os.path.join(self.dirname, self.filename + ".translated"))
class ProcessDirectory(luigi.WrapperTask):
dirname = luigi.Parameter()
def requires(self):
tasks = []
for file_name in os.listdir(self.dirname):
if file_name.endswith("gz"):
prefix = file_name.split(".")[0]
tasks.append(TranslateTask(filename=prefix, dirname=self.dirname))
return tasks