Luigi: ошибка при выполнении параллазационной задачи luigi при помощи функции сборки

Я пробовал многопроцессорность luigi с помощью функции luigi.build. но я получаю некоторую библиотечную ошибку во время выполнения.

для следующего в self._add(item, is_complete): файл "/home/manoj/anaconda2/lib/python2.7/site-packages/luigi/worker.py", строка 604, в файле _add self._validate_dependency(d) "/home/manoj/anaconda2/lib/python2.7/site-packages/luigi/worker.py", строка 622, в _validate_dependency, вызывает исключение ("require () должна возвращать объекты Task")

Вот кусок кода, который я пытался достичь данной цели.

import luigi

class TaskOne(luigi.Task):
    custid= luigi.Parameter() 
    def requires(self):
        pass
    def output(self):
        return luigi.LocalTarget("logs/"+str(self.custid)+"_success")
    def run(self):
        with self.output().open('w') as f:
            f.write("%s\n" % '')
        
               
class TaskTwo(luigi.Task):  
    def requires(self): 
        customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
        yield luigi.build([TaskOne(custid=cust_id) for cust_id in customersList], workers=2)  
    def output(self):
        return luigi.LocalTarget("logs/overall_success.txt")
    def run(self):
        with self.output().open('w') as f:
            f.write("%s\n" % "success")
         
if __name__ == '__main__':
    luigi.run()

================================================== ======================

1 ответ

Почему вы думаете, что нужно встроить требует?

class TaskTwo(luigi.Task):  
  def requires(self): 
    customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
    return [TaskOne(custid=cust_id) for cust_id in customersList]

Если вам нужно несколько рабочих, вы можете указать это в командной строке при запуске конвейера.

luigi --module your_module TaskTwo --workers 2

requires() должен вернуть luigi.Task объект или список luigi.Task объекты. Тем не мение, luigi.build() ничего не возвращает Вам не нужно звонить luigi.build явно выполнять задачи, потому что Luigi самостоятельно обрабатывает требования к ним. Пример задачи, описанный в https://luigi.readthedocs.io/en/stable/tasks.html показывает базовую парадигму того, как она должна работать.

Кроме того, вы должны опустить requires() от TaskOne, Если он не имеет зависимостей, то нет необходимости определять его.

Другие вопросы по тегам