Воздушный поток: как передать значение xcom из PostgreOperator?

Я использую Airflow 1.8.1 и хочу отправить результат запроса sql от PostgreOperator.

Вот мои задачи:

check_task = PostgresOperator(
    task_id='check_task',
    postgres_conn_id='conx',
    sql="check_task.sql",
    xcom_push=True,
    dag=dag)

def py_is_first_execution(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='check_task')
    print 'count ----> ', value
    if value == 0:
       return 'next_task'
    else:
       return 'end-flow'

check_branch = BranchPythonOperator(
    task_id='is-first-execution',
    python_callable=py_is_first_execution,
    provide_context=True,
    dag=dag)

и вот мой сценарий sql:

select count(1) from table

когда я проверяю значение xcom из check_task это восстанавливает none значение.

2 ответа

Решение

Наконец, я создал новый датчик ExecuteSqlOperator в диспетчере плагинов под $AIRFLOW_HOME/plugins,

я использовал CheckOperator в качестве примера, и я изменил возвращаемое значение: основной запуск этого оператора был полностью противоположен тому, что мне было нужно.

Вот по умолчанию ExecuteSqlOperator: CheckOperator

а вот мой заказ SqlSensor: ReverseSqlSensor

class SqlExecuteOperator(BaseOperator):
"""
Performs checks against a db. The ``CheckOperator`` expects
a sql query that will return a single row.

Note that this is an abstract class and get_db_hook
needs to be defined. Whereas a get_db_hook is hook that gets a
single record from an external source.
:param sql: the sql to be executed
:type sql: string
"""

template_fields = ('sql',)
template_ext = ('.hql', '.sql',)
ui_color = '#fff7e6'

@apply_defaults
def __init__(
        self, sql,
        conn_id=None,
        *args, **kwargs):
    super(SqlExecuteOperator, self).__init__(*args, **kwargs)
    self.conn_id = conn_id
    self.sql = sql

def execute(self, context=None):
    logging.info('Executing SQL statement: ' + self.sql)
    records = self.get_db_hook().get_first(self.sql)
    logging.info("Record: " + str(records))
    records_int = int(records[0])
    print (records_int)
    return records_int

def get_db_hook(self):
    return BaseHook.get_hook(conn_id=self.conn_id)

Если я прав, поток воздуха автоматически отправляется в xcom, когда запрос возвращает значение. Однако, когда вы посмотрите на код постгресоператора, вы увидите, что у него есть метод execute, который вызывает метод run из PostgresHook (расширение dbapi_hook). Оба метода ничего не возвращают, так как он ничего не толкает в xcom. Чтобы исправить это, мы создали CustomPostgresSelectOperator, копию PostgresOperator, но вместо "hook.run(..)" делаем "return hook.get_records(..)".

Надеюсь, это поможет вам.

Другие вопросы по тегам