Как отключить отказ для конкретной задачи воздушного потока?

Я поставил условную задачу в потоке воздуха, описанном здесь. Все, что он делает, это проверяет, существует ли раздел улья. Если да, перейдите к остальным задачам, а если нет, сначала добавьте раздел, прежде чем продолжить. Задача условной проверки может либо провалиться, либо завершиться успешно, оба в порядке. Тем не менее, у меня настроено уведомление по электронной почте о пейджерной обязанности, потому что я хочу знать, когда не выполняются последующие задачи. Как отключить уведомление об отказе в этой конкретной условной задаче, чтобы при ложном срабатывании пейджера я не получил ложную тревогу?

1 ответ

email_on_failure а также on_failure_callback и т.д. являются параметрами уровня задачи (оператора). Они наследуют от объекта DAG, значение default_args вы перешли к DAG, но вы также можете перезаписать их при инициализации.

YourOperator(task_id='task1', dag=dag, email_on_failure=None, on_failure_callback=None, ...)

Вот исходный код того, как airflow обрабатывает эти обратные вызовы при сбое задачи, чтобы вы поняли, как она работает.

def handle_failure(self, error, test_mode=False, context=None):
        self.log.exception(error)
        task = self.task
        session = settings.Session()
        self.end_date = datetime.utcnow()
        self.set_duration()
        Stats.incr('operator_failures_{}'.format(task.__class__.__name__), 1, 1)
        Stats.incr('ti_failures')
        if not test_mode:
            session.add(Log(State.FAILED, self))

        # Log failure duration
        session.add(TaskFail(task, self.execution_date, self.start_date, self.end_date))

        # Let's go deeper
        try:
            # Since this function is called only when the TI state is running,
            # try_number contains the current try_number (not the next). We
            # only mark task instance as FAILED if the next task instance
            # try_number exceeds the max_tries.
            if task.retries and self.try_number <= self.max_tries:
                self.state = State.UP_FOR_RETRY
                self.log.info('Marking task as UP_FOR_RETRY')
                if task.email_on_retry and task.email:
                    self.email_alert(error, is_retry=True)
            else:
                self.state = State.FAILED
                if task.retries:
                    self.log.info('All retries failed; marking task as FAILED')
                else:
                    self.log.info('Marking task as FAILED.')
                if task.email_on_failure and task.email:
                    self.email_alert(error, is_retry=False)
        except Exception as e2:
            self.log.error('Failed to send email to: %s', task.email)
            self.log.exception(e2)

        # Handling callbacks pessimistically
        try:
            if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
                task.on_retry_callback(context)
            if self.state == State.FAILED and task.on_failure_callback:
                task.on_failure_callback(context)
        except Exception as e3:
            self.log.error("Failed at executing callback")
            self.log.exception(e3)

        if not test_mode:
            session.merge(self)
        session.commit()
        self.log.error(str(error))

https://airflow.apache.org/_modules/airflow/models.html

Другие вопросы по тегам