Luigi "Незаполненная ошибка зависимости" после обновления таблицы целевых маркеров

Я не хочу запускать два оператора SQL через две задачи, которые имеют одну взаимозависимость, но в процессе я получаю "Незаполненную ошибку зависимости" (см. Ниже):

Сначала я думал, что это связано с проблемой записи в вывод, но похоже, что обе задачи успешно записаны в целевую таблицу в моей базе данных, поэтому я не уверен, почему регистратор выдает эту ошибку?

Вот настройка для моего простого конвейера:

class RunSessionsTable(luigi.Task):
    job_name = 'Creating_Sessions_Table' # unique task name or table impacted
    date_param = luigi.DateParameter(default=datetime.date.today()) # today's date

    def generate_job_number(self):
        pacific = pytz.timezone("US/Mountain")
        return pacific.localize(datetime.datetime.now()).strftime('%H')

    def get_target(self):   
        update_id = '{}_{}_{}'.format(self.job_name,self.date_param, self.generate_job_number())
        return ToSnowflake (connection_string = ToSnowflake.connection_string, target_table= self.job_name, update_id = update_id)      

    def output(self):
        return self.get_target().touch()

    def run (self):
        init = ToSnowflake.engine
        conn = init.connect()
        results = conn.execute(sessions_query)
        #conn.close()
        return self.output()
class RunEventsTable(luigi.Task):
    job_name = 'Creating_Events_Table' # unique task name or table impacted
    date_param = luigi.DateParameter(default=datetime.date.today()) # today's date

    def generate_job_number(self):
        pacific = pytz.timezone("US/Mountain")
        return pacific.localize(datetime.datetime.now()).strftime('%H')

    def requires(self):
        return [RunSessionsTable()]

    def get_target(self):   
        update_id = '{}_{}_{}'.format(self.job_name,self.date_param, self.generate_job_number())
        return ToSnowflake (connection_string = ToSnowflake.connection_string, target_table= self.job_name, update_id = update_id)      

    def output(self):
        return self.get_target().touch()

    def run (self):
        init = ToSnowflake.engine
        conn = init.connect()
        results = conn.execute(events_query)
        #conn.close()
        return self.output()


if __name__ == '__main__':
    luigi.build([RunEventsTable()])

0 ответов

Другие вопросы по тегам