Упс... AttributeError при очистке состояния невыполненной задачи в потоке воздуха
Я пытаюсь очистить невыполненную задачу, чтобы она снова запустилась.
Я обычно делаю это с веб-интерфейсом из дерева
После выбора "Очистить" я попадаю на страницу с ошибкой:
Трассировка на этой странице - та же ошибка, которую я получаю при попытке очистить эту задачу с помощью интерфейса командной строки:
[u@airflow01 ~]# airflow clear -s 2002-07-29T20:25:00 -t
coverage_check gom_modis_aqua_coverage_check
[2018-01-16 16:21:04,235] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-01-16 16:21:05,192] {models.py:167} INFO - Filling up the DagBag from /root/airflow/dags
Traceback (most recent call last):
File "/usr/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/lib/python3.4/site-packages/airflow/bin/cli.py", line 612, in clear
include_upstream=args.upstream,
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3173, in sub_dag
dag = copy.deepcopy(self)
File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
y = copier(memo)
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3159, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
y = copier(x, memo)
File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
y = copier(memo)
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 2202, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
y = copier(x, memo)
File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib64/python3.4/copy.py", line 182, in deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/usr/lib64/python3.4/copy.py", line 309, in _reconstruct
y.__dict__.update(state)
AttributeError: 'NoneType' object has no attribute 'update'
Ищите идеи о том, что, возможно, вызвало это, что я должен сделать, чтобы решить эту задачу, и как избежать этого в будущем.
Я смог обойти эту проблему, удалив запись задачи с помощью поиска "Обзор> Экземпляры задачи", но все же хотел бы изучить проблему, как я видел это несколько раз.
Хотя мой код DAG усложняется, ниже приводится выдержка из определения оператора в dag:
trigger_granule_dag_id = 'trigger_' + process_pass_dag_name
coverage_check = BranchPythonOperator(
task_id='coverage_check',
python_callable=_coverage_check,
provide_context=True,
retries=10,
retry_delay=timedelta(hours=3),
queue=QUEUE.PYCMR,
op_kwargs={
'roi':region,
'success_branch_id': trigger_granule_dag_id
}
)
Полный исходный код можно просмотреть по адресу https://github.com/USF-IMARS/imars_dags. Вот ссылки на наиболее актуальные части:
- Оператор создан в /gom/gom_modis_aqua_coverage_check.py с использованием фабрики modis_aqua_coverage_check
- Функция фабрики определяет охват_черки BranchPythonOperator в /builders/modis_aqua_coverage_check.py
- python_callable - функция _coverage_check в том же файле
2 ответа
Ниже приведен пример группы DAG, созданной для имитации ошибки, с которой вы сталкиваетесь.
import logging
import os
from datetime import datetime, timedelta
import boto3
from airflow import DAG
from airflow import configuration as conf
from airflow.operators import ShortCircuitOperator, PythonOperator, DummyOperator
def athena_data_validation(**kwargs):
pass
start_date = datetime.now()
args = {
'owner': 'airflow',
'start_date': start_date,
'depends_on_past': False,
'wait_for_downstream': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=30)
}
dag_name = 'data_validation_dag'
schedule_interval = None
dag = DAG(
dag_id=dag_name,
default_args=args,
schedule_interval=schedule_interval)
athena_client = boto3.client('athena', region_name='us-west-2')
DAG_SCRIPTS_DIR = conf.get('core', 'DAGS_FOLDER') + "/data_validation/"
start_task = DummyOperator(task_id='Start_Task', dag=dag)
end_task = DummyOperator(task_id='End_Task', dag=dag)
data_validation_task = ShortCircuitOperator(
task_id='Data_Validation',
provide_context=True,
python_callable=athena_data_validation,
op_kwargs={
'athena_client': athena_client,
'sql_file': DAG_SCRIPTS_DIR + 'data_validation.sql',
's3_output_path': 's3://XXX/YYY/'
},
dag=dag)
data_validation_task.set_upstream(start_task)
data_validation_task.set_downstream(end_task)
После одного успешного пробега я попытался очистить Data_Validation
задание и получил ту же ошибку (см. ниже).
Я удалил athena_client
создание объекта и поместил его внутри athena_data_validation
функция, а затем это сработало. Поэтому, когда мы делаем clear
в интерфейсе Airflow, он пытается сделать deepcopy
и получить все объекты из предыдущего запуска. Я все еще пытаюсь понять, почему он не может получить копию object
типа, но я нашел обходной путь, который работал на меня.
Во время некоторых операций Airflow глубоко копирует некоторые объекты. К сожалению, некоторые объекты не позволяют этого. Клиент boto - хороший пример того, что не очень хорошо копирует, объекты потока - это другое, но большие объекты с вложенными ссылками, такими как ссылка на родительскую задачу ниже, также могут вызвать проблемы.
Как правило, вы не хотите создавать экземпляр клиента в самом коде dag. Тем не менее, я не думаю, что это ваша проблема здесь. Хотя у меня нет доступа к коду pyCMR, чтобы увидеть, может ли это быть проблемой.