Доступ к родительскому контексту dag во время создания подтега в потоке воздуха?
Я пытаюсь получить доступ во время создания subdag к некоторым данным xcom из родительского dag, я искал, чтобы добиться этого в Интернете, но я не нашел что-то.
def test(task_id):
logging.info(f' execution of task {task_id}')
def load_subdag(parent_dag_id, child_dag_id, args):
dag_subdag = DAG(
dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
default_args=args,
schedule_interval="@daily",
)
with dag_subdag:
r = DummyOperator(task_id='random')
for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
t = PythonOperator(
task_id='load_subdag_{0}'.format(i),
default_args=args,
python_callable=print_context,
op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
dag=dag_subdag,
)
return dag_subdag
load_tasks = SubDagOperator(
task_id='load_tasks',
subdag=load_subdag(dag.dag_id,
'load_tasks', args),
default_args=args,
)
получил эту ошибку с моим кодом
1 | Traceback (most recent call last):
airflow_1 | File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1 | m = imp.load_source(mod_name, filepath)
airflow_1 | File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1 | module = _load(spec)
airflow_1 | File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1 | File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1 | File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1 | File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1 | File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1 | 'load_tasks', args),
airflow_1 | File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1 | for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1 | TypeError: xcom_pull() missing 1 required positional argument: 'context'
1 ответ
Ошибка проста: вам не хватает context
аргумент требуется xcom_pull()
метод. Но вы действительно не можете просто создать context
перейти в этот метод; это Python
словарь, который Airflow
переходит к якорным методам, таким как pre_execute()
а также execute()
из BaseOperator
(родительский класс всех Operator
с).
Другими словами, context
становится доступным только тогда, когда Operator
фактически выполняется, а не во время DAG
определение И это имеет смысл, потому что в таксономии Airflow
, xcom
S являются механизмом связи между task
в режиме реального времени: разговаривать друг с другом, пока они бегут.
Но в конце дня Xcom
с, как и все остальные Airflow
модель, сохраняются в бэкэнде meta-db. Поэтому, конечно, вы можете напрямую получить его оттуда (очевидно, только XCOM task
s, которые работали в прошлом). Хотя у меня нет фрагмента кода, вы можете посмотреть на cli.py
где они использовали SQLAlchemy
ORM поиграть с моделями и backend-db. Поймите, что это будет означать, что запрос на ваш backend-db запускается каждый раз, когда DAG
-определение файла разбирается, что происходит довольно быстро.
Полезные ссылки
- Как можно установить переменную для использования только во время определенного dag_run
- Как извлечь значение xcom из другого экземпляра задачи в том же прогоне DAG (не самый последний)?
EDIT-1
Посмотрев на ваш фрагмент кода, я встревожился. Предполагая значение, возвращаемое xcom_pull()
будет часто меняться, количество task
в вашем dag
также будет продолжать меняться. Это может привести к непредсказуемому поведению (вы должны сделать немало исследований, но у меня нет хорошего представления об этом)
Я бы посоветовал вам пересмотреть весь рабочий процесс задачи и сконцентрироваться на дизайне, где - число task
с и - структура DAG
известны заранее (на момент исполнения файла определения dag). Конечно, вы можете перебрать json
файл / результат SQL
запрос (например, SQLAlchemy
упомянутое ранее) и т.д. task
s, но этот файл / db / what не должен часто меняться.
Поймите, что просто перебираете список для генерации task
с не проблематично; что НЕ возможно, это иметь структуру вашего DAG
зависит от результата upstream
task
, Например, вы не можете иметь п task
S создан в вашем DAG
основанный на восходящей задаче, вычисляющей значение n во время выполнения.
Так что это невозможно
- Динамические задачи воздушного потока во время выполнения
- Есть ли способ создания динамических рабочих процессов в Airflow
- Динамически создавать список задач
Но это возможно (включая то, что вы пытаетесь достичь; даже если то, как вы это делаете, не кажется хорошей идеей)