Импортировать переменную воздушного потока в PySpark
В последнее время я играю с Airflow и PySpark. Я видел, что Airflow имеет ряд переменных. Моя цель - проанализировать одну из этих переменных и импортировать ее в мой скрипт pySpark. До сих пор я пытался отобразить значение переменной (сработало), но затем я не смог найти способ импортировать в pySpark (я хочу передать значение этой переменной другой переменной в моем скрипте pyspark). Я также прилагаю свой код (job_id
переменная, о которой я говорю).
test_bash = """
export un_id={{ti.job_id}}
echo $un_id
"""
bash_task = BashOperator(
task_id='test',
bash_command=test_bash,
xcom_push=True,
provide_context=True,
dag=dag)
def pull_function(**kwargs):
ti = kwargs['ti']
rt = ti.xcom_pull(task_ids='test')
print(rt)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag
)
#############
bash_task >> pull_task
Любая идея, как я должен продолжать или я делаю что-то не так?
2 ответа
Это значение на самом деле называется run_id
и могут быть доступны через контекст или макросы.
в Pythonoperator
доступ к нему осуществляется через контекст, а в BashOperator
это доступно через Jinja шаблонов на bash_command
поле.
Больше информации о том, что доступно в макросах:
https://airflow.incubator.apache.org/code.html
Больше информации о дзиндзя:
https://airflow.incubator.apache.org/concepts.html
from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='run_id',
schedule_interval=None,
start_date=datetime(2017, 2, 26)
)
def my_func(**kwargs):
context = kwargs
print(context['dag_run'].run_id)
t1 = PythonOperator(
task_id='python_run_id',
python_callable=my_func,
provide_context=True,
dag=dag
)
t2 = BashOperator(
task_id='bash_run_id',
bash_command='echo {{run_id}}',
dag=dag)
t1.set_downstream(t2)
Используйте этот знак в качестве примера, и проверьте журнал для каждого оператора, вы должны увидеть run_id
напечатано в журнале.
Я не пробовал то, что предложил @kaxil, но если я правильно понял ваш вопрос, вы хотите получить run_id
переменная из Airflow и используйте ее в своем скрипте python (pySpark). Если это так, я предполагаю, что вы используете BashOperator
в spark-submit
твоя работа. При отправке искровой работы вам разрешается представить (наряду с вашей работой) некоторые аргументы. Эти аргументы появляются как системные аргументы, которые вы можете увидеть, если вы сделаете print(sys.argv)
(полезно видеть, в какой позиции находится ваша переменная). Поскольку вы уже нажали переменную с bash_task
Вы должны будете тянуть это. Поэтому, когда вы отправляете свою искровую работу, вы также должны добавить дополнительный аргумент, подобный этому:
cmd=""spark-submit your-pyspark-file.py {{ ti.xcom_pull("test") }}
retrieval = BashOperator(
namespace='randomname',
arguments=[cmd],
name='example-dag1',
task_id='name-you-desire',
provide_context=True,
get_logs=True,
dag=dag)
Затем, если вы выполнили print(sys.argv)
вы сможете увидеть свою переменную в качестве аргумента, и в своем скрипте вы можете обратиться к этой переменной с помощью sys.argv[1]
(если он находится во второй позиции, 0, если он находится в первой и т. д.).