Воздушный поток: как передать значение xcom из PostgreOperator?
Я использую Airflow 1.8.1 и хочу отправить результат запроса sql от PostgreOperator.
Вот мои задачи:
check_task = PostgresOperator(
task_id='check_task',
postgres_conn_id='conx',
sql="check_task.sql",
xcom_push=True,
dag=dag)
def py_is_first_execution(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='check_task')
print 'count ----> ', value
if value == 0:
return 'next_task'
else:
return 'end-flow'
check_branch = BranchPythonOperator(
task_id='is-first-execution',
python_callable=py_is_first_execution,
provide_context=True,
dag=dag)
и вот мой сценарий sql:
select count(1) from table
когда я проверяю значение xcom из check_task
это восстанавливает none
значение.
2 ответа
Наконец, я создал новый датчик ExecuteSqlOperator
в диспетчере плагинов под $AIRFLOW_HOME/plugins
,
я использовал CheckOperator
в качестве примера, и я изменил возвращаемое значение: основной запуск этого оператора был полностью противоположен тому, что мне было нужно.
Вот по умолчанию ExecuteSqlOperator
: CheckOperator
а вот мой заказ SqlSensor
: ReverseSqlSensor
class SqlExecuteOperator(BaseOperator):
"""
Performs checks against a db. The ``CheckOperator`` expects
a sql query that will return a single row.
Note that this is an abstract class and get_db_hook
needs to be defined. Whereas a get_db_hook is hook that gets a
single record from an external source.
:param sql: the sql to be executed
:type sql: string
"""
template_fields = ('sql',)
template_ext = ('.hql', '.sql',)
ui_color = '#fff7e6'
@apply_defaults
def __init__(
self, sql,
conn_id=None,
*args, **kwargs):
super(SqlExecuteOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
self.sql = sql
def execute(self, context=None):
logging.info('Executing SQL statement: ' + self.sql)
records = self.get_db_hook().get_first(self.sql)
logging.info("Record: " + str(records))
records_int = int(records[0])
print (records_int)
return records_int
def get_db_hook(self):
return BaseHook.get_hook(conn_id=self.conn_id)
Если я прав, поток воздуха автоматически отправляется в xcom, когда запрос возвращает значение. Однако, когда вы посмотрите на код постгресоператора, вы увидите, что у него есть метод execute, который вызывает метод run из PostgresHook (расширение dbapi_hook). Оба метода ничего не возвращают, так как он ничего не толкает в xcom. Чтобы исправить это, мы создали CustomPostgresSelectOperator, копию PostgresOperator, но вместо "hook.run(..)" делаем "return hook.get_records(..)".
Надеюсь, это поможет вам.