Выполнение_даты в потоке воздуха: необходимо получить доступ в качестве переменной

Я действительно новичок на этом форуме. Но я играл с воздушным потоком некоторое время для нашей компании. Извините, если этот вопрос звучит очень глупо.

Я пишу конвейер, используя кучу BashOperators. В основном, для каждой Задачи я хочу просто вызвать API REST, используя 'curl'

Вот как выглядит мой конвейер (очень упрощенная версия):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

Если вы заметили, что я делаю current_datetime= datetime_obj.now(tz=tz.tzlocal())Вместо того, что я хочу здесь, это "execute_date"

Как я могу использовать 'execute_date' напрямую и назначить его переменной в моем файле python?

У меня есть эта общая проблема доступа к аргументам. Любая помощь будет искренне оценена.

Спасибо

8 ответов

BashOperator"s bash_command Аргумент является шаблоном. Вы можете получить доступ execution_date в любом шаблоне как datetime объект с помощью execution_date переменная. В шаблоне вы можете использовать любой jinja2 методы манипулирования этим.

Используя следующее как ваш BashOperatorbash_command строка:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

Если вы просто хотите строку, эквивалентную дате выполнения, ds возвратит метку даты (ГГГГ-ММ-ДД), ds_nodash возвращает то же самое без черточек (ГГГГММДД) и т. д. Подробнее о macros доступно в Api Docs.


Ваш последний оператор будет выглядеть так:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)

Конструктор PythonOperator принимает параметр 'provide_context' (см. https://pythonhosted.org/airflow/code.html). Если это True, тогда он передает несколько параметров в python_callable через kwargs. Я думаю, что вы хотите kwargs ['execute_date'].

Что-то вроде этого:

def python_method(ds, **kwargs):
    Variable.set('execution_date', kwargs['execution_date'])
    return

doit = PythonOperator(
    task_id='doit',
    provide_context=True,
    python_callable=python_method,
    dag=dag)

Я не уверен, как это сделать с BashOperator, но вы можете начать с этой проблемы: https://github.com/airbnb/airflow/issues/775

Я думаю, что вы не можете назначать переменные со значениями из контекста воздушного потока вне экземпляра задачи, они доступны только во время выполнения. В основном, есть два разных шага, когда даг загружается и выполняется в потоке воздуха:

  • Сначала ваш файл dag интерпретируется и анализируется. Он должен работать и компилироваться, а определения задач должны быть правильными (без синтаксической ошибки или чего-либо еще). На этом этапе, если вы вызовете функцию для заполнения некоторых значений, эти функции не смогут получить доступ к контексту воздушного потока (например, к дате выполнения, даже больше, если вы выполняете некоторую обратную засыпку).

  • Второй шаг - выполнение Дага. Только на этом втором шаге переменные, предоставляемые потоком воздуха (execution_date, ds, etc...) доступны, так как они связаны с казнью.

Таким образом, вы не можете инициализировать глобальные переменные, используя контекст Airflow, однако, Airflow предоставляет вам несколько механизмов для достижения одного и того же эффекта:

  1. Используя шаблон jinja в вашей команде (он может быть в строке в коде или в файле, оба будут обработаны). У вас есть список доступных шаблонов здесь: https://airflow.apache.org/code.html. Обратите внимание, что некоторые функции также доступны, особенно для вычисления дней дельта и форматирования даты.

  2. Использование PythonOperator, в котором вы передаете контекст (с provide_context аргумент). Это позволит вам получить доступ к тому же шаблону с синтаксисом kwargs['<variable_name'], Если вам это нужно, вы можете вернуть значение из PythonOperator, оно будет сохранено в переменной XCOM, которую вы сможете использовать позже в любом шаблоне. Доступ к переменным XCOM использует следующий синтаксис: https://airflow.apache.org/concepts.html

  3. Если вы напишите свой собственный оператор, вы можете получить доступ к переменным воздушного потока с помощью context,

def execute(self, context):
    execution_date = context.get("execution_date")

Это должно быть внутри метода execute() оператора

Чтобы напечатать дату выполнения внутри вызываемой функции вашего PythonOperator Вы можете использовать следующее в вашем скрипте Airflow, а также можете добавить start_time а также end_time следующее:

def python_func(**kwargs):
    ts = kwargs["execution_date"]
    end_time = str(ts)
    start_time = str(ts.add(minutes=-30))

Я преобразовал значение datetime в строку, так как мне нужно передать его в запросе SQL. Мы можем использовать это иначе.

Вы можете рассмотреть SimpleHttpOperator https://airflow.apache.org/_api/airflow/operators/http_operator/index.html. Это так просто сделать http-запрос. вы можете передать execute_date с параметром конечной точки через шаблон.

Дата выполнения, (datetime.datetime)

 {{ execution_date }}

Вот еще один способ без контекста. использование времени последнего выполнения дага может быть очень полезным в запланированных заданиях ETL. Например, даг, который «скачивает все недавно добавленные файлы». Вместо жесткого кодирования datetime.datetime используйте дату последнего выполнения dag в качестве фильтра времени.

У Airflow Dags на самом деле есть класс под названием DagRun, к которому можно получить доступ следующим образом: dag_runs = DagRun.find(dag_id=dag_id)

Вот простой способ узнать время выполнения последнего запуска:

      def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[1] if len(dag_runs) > 1 else None

Затем в вашем pythonOperator вы можете динамически получить доступ к последнему выполнению dag, вызвав функцию, которую вы создали выше:

      last_execution = get_most_recent_dag_run('svb_to_s3')

Теперь это переменная!

Другие вопросы по тегам