Макросы воздушного потока в Python Operator
Я пытаюсь использовать макросы Airflow в своем операторе Python, но получаю сообщение "airflow: error: нераспознанные аргументы:"
Поэтому я импортирую функцию, которая имеет 3 позиционных аргумента: (sys.argv, start_date, end_date), и я надеюсь сделать start_date и end_date датой выполнения в Airflow.
Аргументы функции выглядят примерно так
def main(argv,start_date,end_date):
Вот задача, которую я имею в DAG:
t1 = PythonOperator(
task_id='Pull_DCM_Report',
provide_context=True,
python_callable=main,
op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
dag=dag)
2 ответа
Поскольку вы передаете даты, которые должны быть отображены с помощью Airflow, вы захотите использовать templates_dict
параметр в операторе Python. Это единственное поле, которое Airflow распознает как содержащие шаблоны.
Вы можете создать собственный оператор Python, который распознает больше полей в качестве шаблонов, скопировав существующий оператор и добавив соответствующие поля в template_fields
кортеж.
def main(**kwargs):
argv = kwargs.get('templates_dict').get('argv')
start_date = kwargs.get('templates_dict').get('start_date')
end_date = kwargs.get('templates_dict').get('end_date')
t1 = PythonOperator(task_id='Pull_DCM_Report',
provide_context=True,
python_callable=main,
templates_dict={'argv': sys.argv,
'start_date': '{{ yesterday_ds }}',
'end_date': '{{ ds }}'},
dag=dag)
Вы можете "обернуть" звонок на main
функция со следующим:
t1 = PythonOperator(
task_id='Pull_DCM_Report',
provide_context=True,
python_callable=lambda **context: main([], context["ds"], context["ds"]),
dag=dag)
Если лямбда не ваша чашка чая, вы можете определить функцию, вызвать ее и попросить main
,