Проверьте один DAG, который делает сеть туда и обратно

У меня есть задача, которая читает список файлов из Azure и отправляет результаты в XCOM. Оператор конкретно AzureDataLakeStorageListOperator, Источник здесь: adls_list_operator.py

Я хочу напечатать вывод этой задачи, используя что-то вроде BashOperator но я не уверен, как сделать это локально. Насколько я могу судить, airflow test Команда выполняет только отдельные задачи, и поэтому я не могу получить выходные данные своей первой задачи при тестировании моей второй задачи.

Вот мой полный DAG:

from airflow import DAG
from airflow.contrib.operators.adls_list_operator import AzureDataLakeStorageListOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['vishaalkal@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('adls', default_args=default_args,
          schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = AzureDataLakeStorageListOperator(
    task_id='list_adls_files',
    path='reportdata/*.csv',
    dag=dag)

t2 = BashOperator(
    task_id='templated',
    bash_command='date; echo "{{ task_instance.xcom_pull("t1") }}"',
    dag=dag
)

t2.set_upstream(t1)

Мне кажется, я правильно настроил аутентификацию, так как AccessDenied ошибки до, но больше не вызывает исключение.

0 ответов

Другие вопросы по тегам