Luigi "Незаполненная ошибка зависимости" после обновления таблицы целевых маркеров
Я не хочу запускать два оператора SQL через две задачи, которые имеют одну взаимозависимость, но в процессе я получаю "Незаполненную ошибку зависимости" (см. Ниже):
Сначала я думал, что это связано с проблемой записи в вывод, но похоже, что обе задачи успешно записаны в целевую таблицу в моей базе данных, поэтому я не уверен, почему регистратор выдает эту ошибку?
Вот настройка для моего простого конвейера:
class RunSessionsTable(luigi.Task):
job_name = 'Creating_Sessions_Table' # unique task name or table impacted
date_param = luigi.DateParameter(default=datetime.date.today()) # today's date
def generate_job_number(self):
pacific = pytz.timezone("US/Mountain")
return pacific.localize(datetime.datetime.now()).strftime('%H')
def get_target(self):
update_id = '{}_{}_{}'.format(self.job_name,self.date_param, self.generate_job_number())
return ToSnowflake (connection_string = ToSnowflake.connection_string, target_table= self.job_name, update_id = update_id)
def output(self):
return self.get_target().touch()
def run (self):
init = ToSnowflake.engine
conn = init.connect()
results = conn.execute(sessions_query)
#conn.close()
return self.output()
class RunEventsTable(luigi.Task):
job_name = 'Creating_Events_Table' # unique task name or table impacted
date_param = luigi.DateParameter(default=datetime.date.today()) # today's date
def generate_job_number(self):
pacific = pytz.timezone("US/Mountain")
return pacific.localize(datetime.datetime.now()).strftime('%H')
def requires(self):
return [RunSessionsTable()]
def get_target(self):
update_id = '{}_{}_{}'.format(self.job_name,self.date_param, self.generate_job_number())
return ToSnowflake (connection_string = ToSnowflake.connection_string, target_table= self.job_name, update_id = update_id)
def output(self):
return self.get_target().touch()
def run (self):
init = ToSnowflake.engine
conn = init.connect()
results = conn.execute(events_query)
#conn.close()
return self.output()
if __name__ == '__main__':
luigi.build([RunEventsTable()])