Датчик обратного потока http воздушного потока
Наша реализация airflow отправляет http-запросы для получения услуг для выполнения задач. Мы хотим, чтобы эти службы сообщали airflow, когда они завершают свою задачу, поэтому мы отправляем URL-адрес обратного вызова в службу, которую они будут вызывать, когда их задача будет выполнена. Однако я не могу найти датчик обратного вызова. Как люди справляются с этим нормально?
1 ответ
В Airflow нет такого понятия, как обратный вызов или датчик веб-крюка. Определение датчика следует из документации:
Датчики - это оператор определенного типа, который будет работать до тех пор, пока не будет достигнут определенный критерий. Примеры включают в себя конкретную посадку файлов в HDFS или S3, раздел, появляющийся в Hive, или определенное время дня. Датчики являются производными от BaseSensorOperator и запускают метод poke с указанным poke_interval до тех пор, пока он не вернет True.
Это означает, что датчик - это оператор, который выполняет опрос на внешних системах. В этом смысле ваши внешние службы должны иметь способ сохранять состояние для каждой выполненной задачи - как внутренней, так и внешней - чтобы датчик опроса мог проверить это состояние.
Таким образом, вы можете использовать, например, airflow.operators.HttpSensor, который опрашивает конечную точку HTTP, пока не будет выполнено условие. Или, что еще лучше, напишите свой собственный датчик, который дает вам возможность выполнять более сложную обработку и сохранять состояние.
В противном случае, если служба выводит данные в систему хранения, вы можете использовать, например, датчик, который опрашивает базу данных. Я верю, что вы поняли идею.
Я прилагаю пример пользовательского оператора, который я написал для интеграции с API Apache Livy. Датчик выполняет две функции: а) отправляет задание Spark через REST API и б) ожидает завершения задания.
Оператор расширяет SimpleHttpOperator и в то же время реализует HttpSensor, таким образом объединяя обе функции.
class LivyBatchOperator(SimpleHttpOperator):
"""
Submits a new Spark batch job through
the Apache Livy REST API.
"""
template_fields = ('args',)
ui_color = '#f4a460'
@apply_defaults
def __init__(self,
name,
className,
file,
executorMemory='1g',
driverMemory='512m',
driverCores=1,
executorCores=1,
numExecutors=1,
args=[],
conf={},
timeout=120,
http_conn_id='apache_livy',
*arguments, **kwargs):
"""
If xcom_push is True, response of an HTTP request will also
be pushed to an XCom.
"""
super(LivyBatchOperator, self).__init__(
endpoint='batches', *arguments, **kwargs)
self.http_conn_id = http_conn_id
self.method = 'POST'
self.endpoint = 'batches'
self.name = name
self.className = className
self.file = file
self.executorMemory = executorMemory
self.driverMemory = driverMemory
self.driverCores = driverCores
self.executorCores = executorCores
self.numExecutors = numExecutors
self.args = args
self.conf = conf
self.timeout = timeout
self.poke_interval = 10
def execute(self, context):
"""
Executes the task
"""
payload = {
"name": self.name,
"className": self.className,
"executorMemory": self.executorMemory,
"driverMemory": self.driverMemory,
"driverCores": self.driverCores,
"executorCores": self.executorCores,
"numExecutors": self.numExecutors,
"file": self.file,
"args": self.args,
"conf": self.conf
}
print payload
headers = {
'X-Requested-By': 'airflow',
'Content-Type': 'application/json'
}
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Submitting batch through Apache Livy API")
response = http.run(self.endpoint,
json.dumps(payload),
headers,
self.extra_options)
# parse the JSON response
obj = json.loads(response.content)
# get the new batch Id
self.batch_id = obj['id']
log.info('Batch successfully submitted with Id %s', self.batch_id)
# start polling the batch status
started_at = datetime.utcnow()
while not self.poke(context):
if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
self.log.info("Batch %s has finished", self.batch_id)
def poke(self, context):
'''
Function that the sensors defined while deriving this class should
override.
'''
http = HttpHook(method='GET', http_conn_id=self.http_conn_id)
self.log.info("Calling Apache Livy API to get batch status")
# call the API endpoint
endpoint = 'batches/' + str(self.batch_id)
response = http.run(endpoint)
# parse the JSON response
obj = json.loads(response.content)
# get the current state of the batch
state = obj['state']
# check the batch state
if (state == 'starting') or (state == 'running'):
# if state is 'starting' or 'running'
# signal a new polling cycle
self.log.info('Batch %s has not finished yet (%s)',
self.batch_id, state)
return False
elif state == 'success':
# if state is 'success' exit
return True
else:
# for all other states
# raise an exception and
# terminate the task
raise AirflowException(
'Batch ' + str(self.batch_id) + ' failed (' + state + ')')
Надеюсь, это поможет вам немного.