Как использовать более ранний вывод Task в Luigi?

Я пишу конвейер, в котором последующим задачам нужно читать выходные данные предыдущих задач, чтобы они могли знать, какие параметры им нужно передать в своих требованиях.

Я создал упрощенный пример моей настройки ниже.

import random
import pickle
import luigi


class WriteNumbers(luigi.Task):

    def requires(self):
        pass

    def run(self):
        # pickle a random list of ints 1-100
        numbers = [random.randint(1, 100) for _ in range(100)]
        pickle.dump(numbers, open("./numbers.pkl", 'wb'))

    def output(self):
        return luigi.LocalTarget("./numbers.pkl")


class SquareNumber(luigi.Task):
    number = luigi.IntParameter()

    def requires(self):
        pass

    def run(self):
        # given a number as the parameter, write a file containing its square
        with open("./squared_{}".format(self.number), 'w') as f:
            f.write(str(self.number ** 2))

    def output(self):
        return luigi.LocalTarget("./squared_{}".format(self.number))


class SquareAll(luigi.WrapperTask):

    def requires(self):
        yield WriteNumbers()  # require the number list to be pickled first
        numbers = pickle.load("./numbers.pkl")  # load the number list
        for n in numbers:  # square each number in the number list
            yield SquareNumber(number=n)

class CubeNumber(luigi.Task):
    number = luigi.IntParameter()

    def requires(self):
        pass

    def run(self):
        # given a number as the parameter, write a file containing its cube
        with open("./cubed_{}".format(self.number), 'w') as f:
            f.write(str(self.number ** 3))

    def output(self):
        return luigi.LocalTarget("./cubed_{}".format(self.number))


class CubeAll(luigi.WrapperTask):

    def requires(self):
        yield WriteNumbers()  # require the number list to be pickled first
        numbers = pickle.load("./numbers.pkl")  # load the number list
        for n in numbers:  # square each number in the number list
            yield CubeNumber(number=n)

class CrunchNumbers(luigi.WrapperTask):
    def requires(self):
        yield SquareAll()
        yield CubeAll()

if __name__ == '__main__':
    luigi.run()

Когда запускается через python luigi_example.py CrunchNumbers, 100 случайных чисел будут созданы и список маринован и сброшен на диск. SquareAll загружает этот маринованный список и использует его для SquareNumber задачи с необходимыми параметрами. CubeAll ссылается на тот же файл результатов для своей аналогичной задачи.

Проблема в том, что при запуске будет сгенерировано исключение, потому что numbers.pkl Файл еще не существует.

Как я могу позволить более поздним задачам генерировать зависимости на основе результатов более ранней задачи? Я использовал здесь случайные числа, чтобы указать, что результат не может быть известен заранее: мое настоящее приложение обрабатывает данные из API.

0 ответов

Вы используете динамические зависимости, и их нужно вызывать изrun метод (когда результаты requires доступны как input), так CubeAll а также SquareAll должен иметь такую ​​структуру:

class SquareAll(luigi.WrapperTask):

    def requires(self):
        yield WriteNumbers()  # require the number list to be pickled first

    def run(self):
        numbers_file = self.input()[0].path
        numbers = pickle.load(numbers_file)  # load the number list
        for n in numbers:  # square each number in the number list
            yield SquareNumber(number=n)
Другие вопросы по тегам