Воздушный поток принимает шаблон дзиндзя как строку
В Airflow я пытаюсь использовать шаблон jinja в airflow, но проблема в том, что он не анализируется, а рассматривается как строка. Пожалуйста, смотрите мой код ``
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
def test_method(dag,network_id,schema_name):
print "Schema_name in test_method", schema_name
third_task = PythonOperator(
task_id='first_task_' + network_id,
provide_context=True,
python_callable=print_context2,
dag=dag)
return third_task
dag = DAG('testing_xcoms_pull', description='Testing Xcoms',
schedule_interval='0 12 * * *',
start_date= datetime.today(),
catchup=False)
def print_context(ds, **kwargs):
return 'Returning from print_context'
def print_context2(ds, **kwargs):
return 'Returning from print_context2'
def get_schema(ds, **kwargs):
# Returning schema name based on network_id
schema_name = "my_schema"
return get_schema
first_task = PythonOperator(
task_id='first_task',
provide_context=True,
python_callable=print_context,
dag=dag)
second_task = PythonOperator(
task_id='second_task',
provide_context=True,
python_callable=get_schema,
dag=dag)
network_id = '{{ dag_run.conf["network_id"]}}'
first_task >> second_task >> test_method(
dag=dag,
network_id=network_id,
schema_name='{{ ti.xcom_pull("second_task")}}')
``
Создание Dag терпит неудачу, потому что '{{ dag_run.conf["network_id"]}}'
принимается за струну потоком воздуха. Может кто-нибудь помочь мне с проблемой в моем коде???
2 ответа
Операторы воздушного потока имеют переменную с именем template_fields. Эта переменная обычно объявляется в верхней части класса операторов, посмотрите любой из операторов в кодовой базе github.
Если поле, в которое вы пытаетесь передать синтаксис шаблона Jinja, отсутствует в списке template_fields, синтаксис jinja будет отображаться в виде строки.
DAG
Объект и его код определения не анализируются в контексте выполнения, он анализируется относительно среды, доступной ему при загрузке Python.
network_id
переменная, которую вы используете для определения task_id
в вашей функции не шаблонизируется перед выполнением, не может быть, так как выполнение не активно. Даже с шаблонизацией вам все еще нужен действительный, статичный, не шаблонизированный task_id
значение для создания экземпляра DAG
объект.