Как использовать xcom pull с оператором postgres, airflow 2.0.2?
Я пытаюсь передать параметры в операторе postgres, а не в жестком коде. Это исходит из моей предыдущей задачи, как вы видите ниже: я буду очень признателен, если вы расскажете мне, что здесь не так и как это исправить?
get_query_id_task = PythonOperator(
task_id='get_query_id',
python_callable=query_and_push,
#provide_context=True,
op_kwargs={
'sql' : read_sql('warmupqueryid.sql')
}
)
get_query_text_task= PostgresOperator(
task_id='get_query_text',
trigger_rule=TriggerRule.ALL_DONE,
sql='rs_warm-up_query-text.sql',
parameters={'query_ids': "{{ ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }}"},
)
My sql is:
SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
FROM stl_querytext
WHERE query in ({{ macros.render_list_sql(params.query_ids) }});
and here is my function:
def render_list_sql(list):
return ', '.join(list)
1 ответ
params
Аргумент не является «шаблонным», поэтому он будет отображать только строки. Так что переместите свой
param
прямо в SQL
get_query_text_task= PostgresOperator(
task_id='get_query_text',
postgres_conn_id='redshift',
trigger_rule=TriggerRule.ALL_DONE,
sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
FROM stl_querytext
WHERE query in ({{ macros.custom_macros.render_list_sql( ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }});""",
)