Luigi: ошибка при выполнении параллазационной задачи luigi при помощи функции сборки
Я пробовал многопроцессорность luigi с помощью функции luigi.build. но я получаю некоторую библиотечную ошибку во время выполнения.
для следующего в self._add(item, is_complete): файл "/home/manoj/anaconda2/lib/python2.7/site-packages/luigi/worker.py", строка 604, в файле _add self._validate_dependency(d) "/home/manoj/anaconda2/lib/python2.7/site-packages/luigi/worker.py", строка 622, в _validate_dependency, вызывает исключение ("require () должна возвращать объекты Task")
Вот кусок кода, который я пытался достичь данной цели.
import luigi
class TaskOne(luigi.Task):
custid= luigi.Parameter()
def requires(self):
pass
def output(self):
return luigi.LocalTarget("logs/"+str(self.custid)+"_success")
def run(self):
with self.output().open('w') as f:
f.write("%s\n" % '')
class TaskTwo(luigi.Task):
def requires(self):
customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
yield luigi.build([TaskOne(custid=cust_id) for cust_id in customersList], workers=2)
def output(self):
return luigi.LocalTarget("logs/overall_success.txt")
def run(self):
with self.output().open('w') as f:
f.write("%s\n" % "success")
if __name__ == '__main__':
luigi.run()
================================================== ======================
1 ответ
Почему вы думаете, что нужно встроить требует?
class TaskTwo(luigi.Task):
def requires(self):
customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
return [TaskOne(custid=cust_id) for cust_id in customersList]
Если вам нужно несколько рабочих, вы можете указать это в командной строке при запуске конвейера.
luigi --module your_module TaskTwo --workers 2
requires()
должен вернуть luigi.Task
объект или список luigi.Task
объекты. Тем не мение, luigi.build()
ничего не возвращает Вам не нужно звонить luigi.build
явно выполнять задачи, потому что Luigi самостоятельно обрабатывает требования к ним. Пример задачи, описанный в https://luigi.readthedocs.io/en/stable/tasks.html показывает базовую парадигму того, как она должна работать.
Кроме того, вы должны опустить requires()
от TaskOne
, Если он не имеет зависимостей, то нет необходимости определять его.