Воздушный поток принимает шаблон дзиндзя как строку

В Airflow я пытаюсь использовать шаблон jinja в airflow, но проблема в том, что он не анализируется, а рассматривается как строка. Пожалуйста, смотрите мой код ``

from datetime import datetime

from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

def test_method(dag,network_id,schema_name):
    print "Schema_name in test_method", schema_name
    third_task = PythonOperator(
        task_id='first_task_' + network_id,
        provide_context=True,
        python_callable=print_context2,
        dag=dag)
    return third_task

dag = DAG('testing_xcoms_pull', description='Testing Xcoms',
          schedule_interval='0 12 * * *',
          start_date= datetime.today(),
          catchup=False)


def print_context(ds, **kwargs):
    return 'Returning from print_context'

def print_context2(ds, **kwargs):
    return 'Returning from print_context2'

def get_schema(ds, **kwargs):
    # Returning schema name based on network_id
    schema_name = "my_schema"
    return get_schema

first_task = PythonOperator(
    task_id='first_task',
    provide_context=True,
    python_callable=print_context,
    dag=dag)


second_task = PythonOperator(
    task_id='second_task',
    provide_context=True,
    python_callable=get_schema,
    dag=dag)

network_id = '{{ dag_run.conf["network_id"]}}'

first_task >> second_task >> test_method(
                    dag=dag,
                    network_id=network_id,
                    schema_name='{{ ti.xcom_pull("second_task")}}')

``

Создание Dag терпит неудачу, потому что '{{ dag_run.conf["network_id"]}}' принимается за струну потоком воздуха. Может кто-нибудь помочь мне с проблемой в моем коде???

2 ответа

Операторы воздушного потока имеют переменную с именем template_fields. Эта переменная обычно объявляется в верхней части класса операторов, посмотрите любой из операторов в кодовой базе github.

Если поле, в которое вы пытаетесь передать синтаксис шаблона Jinja, отсутствует в списке template_fields, синтаксис jinja будет отображаться в виде строки.

DAG Объект и его код определения не анализируются в контексте выполнения, он анализируется относительно среды, доступной ему при загрузке Python.

network_id переменная, которую вы используете для определения task_id в вашей функции не шаблонизируется перед выполнением, не может быть, так как выполнение не активно. Даже с шаблонизацией вам все еще нужен действительный, статичный, не шаблонизированный task_id значение для создания экземпляра DAG объект.

Другие вопросы по тегам