Макросы воздушного потока в Python Operator

Я пытаюсь использовать макросы Airflow в своем операторе Python, но получаю сообщение "airflow: error: нераспознанные аргументы:"

Поэтому я импортирую функцию, которая имеет 3 позиционных аргумента: (sys.argv, start_date, end_date), и я надеюсь сделать start_date и end_date датой выполнения в Airflow.

Аргументы функции выглядят примерно так

def main(argv,start_date,end_date):

Вот задача, которую я имею в DAG:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=main,
    op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
    dag=dag)

2 ответа

Поскольку вы передаете даты, которые должны быть отображены с помощью Airflow, вы захотите использовать templates_dict параметр в операторе Python. Это единственное поле, которое Airflow распознает как содержащие шаблоны.

Вы можете создать собственный оператор Python, который распознает больше полей в качестве шаблонов, скопировав существующий оператор и добавив соответствующие поля в template_fields кортеж.

def main(**kwargs):
    argv = kwargs.get('templates_dict').get('argv')
    start_date = kwargs.get('templates_dict').get('start_date')
    end_date = kwargs.get('templates_dict').get('end_date')


t1 = PythonOperator(task_id='Pull_DCM_Report',
                    provide_context=True,
                    python_callable=main,
                    templates_dict={'argv': sys.argv,
                                    'start_date': '{{ yesterday_ds }}',
                                    'end_date': '{{ ds }}'},
                    dag=dag)

Вы можете "обернуть" звонок на main функция со следующим:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=lambda **context: main([], context["ds"], context["ds"]),
    dag=dag)

Если лямбда не ваша чашка чая, вы можете определить функцию, вызвать ее и попросить main,

Другие вопросы по тегам