Как использовать xcom pull с оператором postgres, airflow 2.0.2?

Я пытаюсь передать параметры в операторе postgres, а не в жестком коде. Это исходит из моей предыдущей задачи, как вы видите ниже: я буду очень признателен, если вы расскажете мне, что здесь не так и как это исправить?

       get_query_id_task = PythonOperator(
            task_id='get_query_id',
            python_callable=query_and_push,
            #provide_context=True,
            op_kwargs={
                'sql' : read_sql('warmupqueryid.sql')
                                }
                            )


        get_query_text_task= PostgresOperator(
            task_id='get_query_text',
            trigger_rule=TriggerRule.ALL_DONE,
            sql='rs_warm-up_query-text.sql',
            parameters={'query_ids': "{{ ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }}"},
            )

My sql is:

          SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT

    FROM stl_querytext

    WHERE query in ({{ macros.render_list_sql(params.query_ids) }});

and here is my function:

          def render_list_sql(list):
      return ', '.join(list)

1 ответ

paramsАргумент не является «шаблонным», поэтому он будет отображать только строки. Так что переместите свой param прямо в SQL

          get_query_text_task= PostgresOperator(
        task_id='get_query_text',
        postgres_conn_id='redshift',
        trigger_rule=TriggerRule.ALL_DONE,
        sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in ({{ macros.custom_macros.render_list_sql( ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }});""",
        )
Другие вопросы по тегам