Упс... AttributeError при очистке состояния невыполненной задачи в потоке воздуха

Я пытаюсь очистить невыполненную задачу, чтобы она снова запустилась.

Я обычно делаю это с веб-интерфейсом из дерева

древовидная структура, показывающая невыполненную задачу

После выбора "Очистить" я попадаю на страницу с ошибкой:

страница ошибки

Трассировка на этой странице - та же ошибка, которую я получаю при попытке очистить эту задачу с помощью интерфейса командной строки:

[u@airflow01 ~]# airflow clear -s 2002-07-29T20:25:00 -t 
coverage_check  gom_modis_aqua_coverage_check 
[2018-01-16 16:21:04,235] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-01-16 16:21:05,192] {models.py:167} INFO - Filling up the DagBag from /root/airflow/dags
Traceback (most recent call last):
  File "/usr/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python3.4/site-packages/airflow/bin/cli.py", line 612, in clear
    include_upstream=args.upstream,
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3173, in sub_dag
    dag = copy.deepcopy(self)
  File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3159, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 2202, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib64/python3.4/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib64/python3.4/copy.py", line 309, in _reconstruct
    y.__dict__.update(state)
AttributeError: 'NoneType' object has no attribute 'update'

Ищите идеи о том, что, возможно, вызвало это, что я должен сделать, чтобы решить эту задачу, и как избежать этого в будущем.

Я смог обойти эту проблему, удалив запись задачи с помощью поиска "Обзор> Экземпляры задачи", но все же хотел бы изучить проблему, как я видел это несколько раз.

Хотя мой код DAG усложняется, ниже приводится выдержка из определения оператора в dag:

    trigger_granule_dag_id = 'trigger_' + process_pass_dag_name
    coverage_check = BranchPythonOperator(
        task_id='coverage_check',
        python_callable=_coverage_check,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(hours=3),
        queue=QUEUE.PYCMR,
        op_kwargs={
            'roi':region,
            'success_branch_id': trigger_granule_dag_id
        }
    )

Полный исходный код можно просмотреть по адресу https://github.com/USF-IMARS/imars_dags. Вот ссылки на наиболее актуальные части:

2 ответа

Ниже приведен пример группы DAG, созданной для имитации ошибки, с которой вы сталкиваетесь.

import logging
import os
from datetime import datetime, timedelta

import boto3
from airflow import DAG
from airflow import configuration as conf
from airflow.operators import ShortCircuitOperator, PythonOperator, DummyOperator


def athena_data_validation(**kwargs):
    pass


start_date = datetime.now()

args = {
    'owner': 'airflow',
    'start_date': start_date,
    'depends_on_past': False,
    'wait_for_downstream': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=30)
}

dag_name = 'data_validation_dag'

schedule_interval = None  

dag = DAG(
    dag_id=dag_name,
    default_args=args,
    schedule_interval=schedule_interval)

athena_client = boto3.client('athena', region_name='us-west-2')

DAG_SCRIPTS_DIR = conf.get('core', 'DAGS_FOLDER') + "/data_validation/"

start_task = DummyOperator(task_id='Start_Task', dag=dag)

end_task = DummyOperator(task_id='End_Task', dag=dag)

data_validation_task = ShortCircuitOperator(
    task_id='Data_Validation',
    provide_context=True,
    python_callable=athena_data_validation,
    op_kwargs={
        'athena_client': athena_client,
        'sql_file': DAG_SCRIPTS_DIR + 'data_validation.sql',
        's3_output_path': 's3://XXX/YYY/'
    },
    dag=dag)
data_validation_task.set_upstream(start_task)
data_validation_task.set_downstream(end_task)

После одного успешного пробега я попытался очистить Data_Validation задание и получил ту же ошибку (см. ниже).

Я удалил athena_client создание объекта и поместил его внутри athena_data_validation функция, а затем это сработало. Поэтому, когда мы делаем clear в интерфейсе Airflow, он пытается сделать deepcopy и получить все объекты из предыдущего запуска. Я все еще пытаюсь понять, почему он не может получить копию object типа, но я нашел обходной путь, который работал на меня.

Во время некоторых операций Airflow глубоко копирует некоторые объекты. К сожалению, некоторые объекты не позволяют этого. Клиент boto - хороший пример того, что не очень хорошо копирует, объекты потока - это другое, но большие объекты с вложенными ссылками, такими как ссылка на родительскую задачу ниже, также могут вызвать проблемы.

Как правило, вы не хотите создавать экземпляр клиента в самом коде dag. Тем не менее, я не думаю, что это ваша проблема здесь. Хотя у меня нет доступа к коду pyCMR, чтобы увидеть, может ли это быть проблемой.

Другие вопросы по тегам