Как получить доступ к ответу GET-запроса Airflow SimpleHttpOperator
Я изучаю Airflow и у меня простой вопрос. Ниже мой DAG называется dog_retriever
import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json
default_args = {
'owner': 'Loftium',
'depends_on_past': False,
'start_date': datetime(2017, 10, 9),
'email': 'rachel@loftium.com',
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=3),
}
dag = DAG('dog_retriever',
schedule_interval='@once',
default_args=default_args)
t1 = SimpleHttpOperator(
task_id='get_labrador',
method='GET',
http_conn_id='http_default',
endpoint='api/breed/labrador/images',
headers={"Content-Type": "application/json"},
dag=dag)
t2 = SimpleHttpOperator(
task_id='get_breeds',
method='GET',
http_conn_id='http_default',
endpoint='api/breeds/list',
headers={"Content-Type": "application/json"},
dag=dag)
t2.set_upstream(t1)
В качестве средства тестирования Airflow я просто делаю два GET-запроса к некоторым конечным точкам в этом очень простом http://dog.ceo/ API. Цель состоит в том, чтобы научиться работать с некоторыми данными, полученными через Airflow.
Выполнение работает - мой код успешно вызывает точки назначения в задачах t1 и t2, я вижу, что они регистрируются в пользовательском интерфейсе Airflow в правильном порядке на основе set_upstream
Правило я написал.
Что я не могу понять, так это как получить ответ json на эти 2 задачи. Это кажется таким простым, но я не могу понять это. В SimpleHtttpOperator я вижу параметр для response_check, но не для чего просто напечатать, сохранить или просмотреть ответ json.
Благодарю.
3 ответа
Так как это SimpleHttpOperator, а фактический json отправляется в XCOM, и вы можете получить его оттуда. Вот строка кода для этого действия: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py
То, что вам нужно сделать, это установить xcom_push=True
, так что ваш первый t1 будет следующим:
t1 = SimpleHttpOperator(
task_id='get_labrador',
method='GET',
http_conn_id='http_default',
endpoint='api/breed/labrador/images',
headers={"Content-Type": "application/json"},
xcom_push=True,
dag=dag)
Вы должны быть в состоянии найти все JSON с return value
в XCOM более подробную информацию о XCOM можно найти по адресу: https://airflow.incubator.apache.org/concepts.html
Я добавляю этот ответ в первую очередь для всех, кто пытается (или хочет) вызвать группу DAG рабочего процесса Airflow из процесса и получить любые данные, являющиеся результатом деятельности группы DAG.
Это важно понимать, что HTTP POST требуется для запуска DAG и что ответ на этот пост является зашит в Airflow, то есть без изменения самого кода потока воздуха, воздушный поток никогда не будет ничего, кроме кода состояния и сообщение возвращает в запрашивающем процесс.
Воздушный поток, по-видимому, используется в основном для создания конвейеров данных для рабочих процессов ETL (извлечение, преобразование, загрузка), существующие операторы воздушного потока, например SimpleHttpOperator, могут получать данные из веб-служб RESTful, обрабатывать их и записывать в базы данных с использованием других операторов, но не возвращайте его в ответ на HTTP-запрос POST, запускающий DAG рабочего процесса.
Даже если операторы вернули эти данные в ответе, просмотр исходного кода Airflow подтверждает, что метод trigger_dag() не проверяет и не возвращает их:
apache_airflow_airflow_www_api_experimental_endpoints.py
apache_airflow_airflow_api_client_json_client.py
Все, что он возвращает, - это подтверждающее сообщение:
Сообщение Airflow DagRun получено в службе оркестрации
Поскольку Airflow является OpenSource, я полагаю, мы могли бы изменить метод trigger_dag(), чтобы он возвращал данные, но тогда мы застряли бы в поддержке разветвленной кодовой базы, и мы не смогли бы использовать облачный хостинг на основе Airflow. такие сервисы, как Cloud Composer на Google Cloud Platform, потому что он не будет включать нашу модификацию.
Хуже того, Apache Airflow даже не возвращает правильно зашифрованное сообщение о состоянии.
Когда мы успешно отправляем сообщение в воздушный поток/dags/{DAG-ID}/dag_runs
конечная точка, мы получаем ответ "200 OK", а не ответ " 201 Created", как мы должны. И Airflow "жестко кодирует" тело ответа Content с его статусным сообщением "Создано". Стандарт, однако, состоит в том, чтобы возвращать Uri вновь созданного ресурса в заголовке ответа, а не в теле ответа, что позволит телу свободно возвращать любые данные, созданные / агрегированные во время (или в результате) этого создания.
Я приписываю этот недостаток "слепому" (или тому, что я называю "наивным") подходу, основанному на Agile/MVP, который только добавляет требуемые функции, а не остается в курсе и оставляет место для более общей полезности. Поскольку Airflow в подавляющем большинстве используется для создания конвейеров данных для (и ими) специалистов по данным (не инженеров-программистов), операторы Airflow могут обмениваться данными друг с другом, используя свою собственную внутреннюю функцию XCom, как указывает полезный ответ @Chengzhi (спасибо!), но ни при каких обстоятельствах не может вернуть данные запрашивающей стороне который запустил DAG, то есть SimpleHttpOperator может извлекать данные из сторонней службы RESTful и может делиться этими данными с PythonOperator (через XCom), который обогащает, агрегирует и / или преобразует их. Затем PythonOperator может поделиться своими данными с PostgresOperator, который сохраняет результат непосредственно в базе данных. Но результат никогда не может быть возвращен процессу, который запросил выполнение работы, то есть нашей службе оркестрации, что делает Airflow бесполезным для любого варианта использования, кроме того, которым управляют его текущие пользователи.
Выводы здесь (по крайней мере для меня): 1) никогда не приписывать слишком много опыта никому или какой-либо организации. Apache - важная организация, имеющая глубокие и жизненно важные корни в разработке программного обеспечения, но они несовершенны. И 2) всегда остерегайтесь внутренних, проприетарных решений. Открытые, основанные на стандартах решения были изучены и проверены с разных точек зрения, а не только с одной.
Я потерял почти неделю в поисках разных способов сделать то, что казалось очень простым и разумным. Я надеюсь, что этот ответ сэкономит время кому-то еще.
Мне удалось получить json из API, обработать его и отправить в другой API. Ниже приводится мой комментарий:
from airflow.models import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.models import Variable
from datetime import datetime
import json
default_args = {
'start_date': datetime(2020, 1, 1)
}
def _processing_user(ti):
users_txt = ti.xcom_pull(task_ids=["fetch_user"])[0]
users = json.loads(users_txt)
if not len(users) or 'results' not in users:
raise ValueError("User is empty")
user = users['results'][0]
user_map = {
'firstname':user['name']['first'],
'lastname':user['name']['last'],
'name': user['name']['first']+user['name']['last']
}
processed_user = json.dumps(user_map)
Variable.set("user", processed_user)
with DAG('user_data_processing',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
fetch_user = SimpleHttpOperator(
task_id='fetch_user',
http_conn_id='user_api',
endpoint='api/',
method='GET'
)
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
send_response = SimpleHttpOperator(
task_id="sendresponse",
http_conn_id="http_conn_id",
method="POST",
endpoint="apacheairflowcreatename",
data="{{ var.json.user }}",
headers={"Content-Type": "application/json"}
)
print_user = BashOperator(
task_id='log_user',
bash_command='echo "{{ var.value.user }}"',
)
is_api_available >> fetch_user >> processing_user >> send_response