Как использовать более ранний вывод Task в Luigi?
Я пишу конвейер, в котором последующим задачам нужно читать выходные данные предыдущих задач, чтобы они могли знать, какие параметры им нужно передать в своих требованиях.
Я создал упрощенный пример моей настройки ниже.
import random
import pickle
import luigi
class WriteNumbers(luigi.Task):
def requires(self):
pass
def run(self):
# pickle a random list of ints 1-100
numbers = [random.randint(1, 100) for _ in range(100)]
pickle.dump(numbers, open("./numbers.pkl", 'wb'))
def output(self):
return luigi.LocalTarget("./numbers.pkl")
class SquareNumber(luigi.Task):
number = luigi.IntParameter()
def requires(self):
pass
def run(self):
# given a number as the parameter, write a file containing its square
with open("./squared_{}".format(self.number), 'w') as f:
f.write(str(self.number ** 2))
def output(self):
return luigi.LocalTarget("./squared_{}".format(self.number))
class SquareAll(luigi.WrapperTask):
def requires(self):
yield WriteNumbers() # require the number list to be pickled first
numbers = pickle.load("./numbers.pkl") # load the number list
for n in numbers: # square each number in the number list
yield SquareNumber(number=n)
class CubeNumber(luigi.Task):
number = luigi.IntParameter()
def requires(self):
pass
def run(self):
# given a number as the parameter, write a file containing its cube
with open("./cubed_{}".format(self.number), 'w') as f:
f.write(str(self.number ** 3))
def output(self):
return luigi.LocalTarget("./cubed_{}".format(self.number))
class CubeAll(luigi.WrapperTask):
def requires(self):
yield WriteNumbers() # require the number list to be pickled first
numbers = pickle.load("./numbers.pkl") # load the number list
for n in numbers: # square each number in the number list
yield CubeNumber(number=n)
class CrunchNumbers(luigi.WrapperTask):
def requires(self):
yield SquareAll()
yield CubeAll()
if __name__ == '__main__':
luigi.run()
Когда запускается через python luigi_example.py CrunchNumbers
, 100 случайных чисел будут созданы и список маринован и сброшен на диск. SquareAll
загружает этот маринованный список и использует его для SquareNumber
задачи с необходимыми параметрами. CubeAll
ссылается на тот же файл результатов для своей аналогичной задачи.
Проблема в том, что при запуске будет сгенерировано исключение, потому что numbers.pkl
Файл еще не существует.
Как я могу позволить более поздним задачам генерировать зависимости на основе результатов более ранней задачи? Я использовал здесь случайные числа, чтобы указать, что результат не может быть известен заранее: мое настоящее приложение обрабатывает данные из API.
0 ответов
Вы используете динамические зависимости, и их нужно вызывать изrun
метод (когда результаты requires
доступны как input
), так CubeAll
а также SquareAll
должен иметь такую структуру:
class SquareAll(luigi.WrapperTask):
def requires(self):
yield WriteNumbers() # require the number list to be pickled first
def run(self):
numbers_file = self.input()[0].path
numbers = pickle.load(numbers_file) # load the number list
for n in numbers: # square each number in the number list
yield SquareNumber(number=n)