Импортировать переменную воздушного потока в PySpark

В последнее время я играю с Airflow и PySpark. Я видел, что Airflow имеет ряд переменных. Моя цель - проанализировать одну из этих переменных и импортировать ее в мой скрипт pySpark. До сих пор я пытался отобразить значение переменной (сработало), но затем я не смог найти способ импортировать в pySpark (я хочу передать значение этой переменной другой переменной в моем скрипте pyspark). Я также прилагаю свой код (job_id переменная, о которой я говорю).

test_bash = """
export un_id={{ti.job_id}}
echo $un_id
"""

bash_task = BashOperator(
    task_id='test',
    bash_command=test_bash,
    xcom_push=True,
    provide_context=True,
    dag=dag)

def pull_function(**kwargs):
    ti = kwargs['ti']
    rt = ti.xcom_pull(task_ids='test')
    print(rt)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag
)

#############
bash_task >> pull_task

Любая идея, как я должен продолжать или я делаю что-то не так?

2 ответа

Решение

Это значение на самом деле называется run_id и могут быть доступны через контекст или макросы.

в Pythonoperator доступ к нему осуществляется через контекст, а в BashOperator это доступно через Jinja шаблонов на bash_command поле.

Больше информации о том, что доступно в макросах:

https://airflow.incubator.apache.org/code.html

Больше информации о дзиндзя:

https://airflow.incubator.apache.org/concepts.html

from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


dag = DAG(
    dag_id='run_id',
    schedule_interval=None,
    start_date=datetime(2017, 2, 26)
)

def my_func(**kwargs):
    context = kwargs
    print(context['dag_run'].run_id)

t1 = PythonOperator(
    task_id='python_run_id',
    python_callable=my_func,
    provide_context=True,
    dag=dag
    )

t2 = BashOperator(
    task_id='bash_run_id',
    bash_command='echo {{run_id}}',
    dag=dag)

t1.set_downstream(t2)

Используйте этот знак в качестве примера, и проверьте журнал для каждого оператора, вы должны увидеть run_id напечатано в журнале.

Я не пробовал то, что предложил @kaxil, но если я правильно понял ваш вопрос, вы хотите получить run_id переменная из Airflow и используйте ее в своем скрипте python (pySpark). Если это так, я предполагаю, что вы используете BashOperator в spark-submit твоя работа. При отправке искровой работы вам разрешается представить (наряду с вашей работой) некоторые аргументы. Эти аргументы появляются как системные аргументы, которые вы можете увидеть, если вы сделаете print(sys.argv) (полезно видеть, в какой позиции находится ваша переменная). Поскольку вы уже нажали переменную с bash_task Вы должны будете тянуть это. Поэтому, когда вы отправляете свою искровую работу, вы также должны добавить дополнительный аргумент, подобный этому:

cmd=""spark-submit your-pyspark-file.py {{ ti.xcom_pull("test") }}

retrieval = BashOperator(
    namespace='randomname',
    arguments=[cmd],
    name='example-dag1',
    task_id='name-you-desire',
    provide_context=True,
    get_logs=True, 
    dag=dag)

Затем, если вы выполнили print(sys.argv) вы сможете увидеть свою переменную в качестве аргумента, и в своем скрипте вы можете обратиться к этой переменной с помощью sys.argv[1] (если он находится во второй позиции, 0, если он находится в первой и т. д.).

Другие вопросы по тегам