Датчик обратного потока http воздушного потока

Наша реализация airflow отправляет http-запросы для получения услуг для выполнения задач. Мы хотим, чтобы эти службы сообщали airflow, когда они завершают свою задачу, поэтому мы отправляем URL-адрес обратного вызова в службу, которую они будут вызывать, когда их задача будет выполнена. Однако я не могу найти датчик обратного вызова. Как люди справляются с этим нормально?

1 ответ

Решение

В Airflow нет такого понятия, как обратный вызов или датчик веб-крюка. Определение датчика следует из документации:

Датчики - это оператор определенного типа, который будет работать до тех пор, пока не будет достигнут определенный критерий. Примеры включают в себя конкретную посадку файлов в HDFS или S3, раздел, появляющийся в Hive, или определенное время дня. Датчики являются производными от BaseSensorOperator и запускают метод poke с указанным poke_interval до тех пор, пока он не вернет True.

Это означает, что датчик - это оператор, который выполняет опрос на внешних системах. В этом смысле ваши внешние службы должны иметь способ сохранять состояние для каждой выполненной задачи - как внутренней, так и внешней - чтобы датчик опроса мог проверить это состояние.

Таким образом, вы можете использовать, например, airflow.operators.HttpSensor, который опрашивает конечную точку HTTP, пока не будет выполнено условие. Или, что еще лучше, напишите свой собственный датчик, который дает вам возможность выполнять более сложную обработку и сохранять состояние.

В противном случае, если служба выводит данные в систему хранения, вы можете использовать, например, датчик, который опрашивает базу данных. Я верю, что вы поняли идею.

Я прилагаю пример пользовательского оператора, который я написал для интеграции с API Apache Livy. Датчик выполняет две функции: а) отправляет задание Spark через REST API и б) ожидает завершения задания.

Оператор расширяет SimpleHttpOperator и в то же время реализует HttpSensor, таким образом объединяя обе функции.

class LivyBatchOperator(SimpleHttpOperator):
"""
Submits a new Spark batch job through
the Apache Livy REST API.

"""

template_fields = ('args',)
ui_color = '#f4a460'

@apply_defaults
def __init__(self,
             name,
             className,
             file,
             executorMemory='1g',
             driverMemory='512m',
             driverCores=1,
             executorCores=1,
             numExecutors=1,
             args=[],
             conf={},
             timeout=120,
             http_conn_id='apache_livy',
             *arguments, **kwargs):
    """
    If xcom_push is True, response of an HTTP request will also
    be pushed to an XCom.
    """
    super(LivyBatchOperator, self).__init__(
        endpoint='batches', *arguments, **kwargs)

    self.http_conn_id = http_conn_id
    self.method = 'POST'
    self.endpoint = 'batches'
    self.name = name
    self.className = className
    self.file = file
    self.executorMemory = executorMemory
    self.driverMemory = driverMemory
    self.driverCores = driverCores
    self.executorCores = executorCores
    self.numExecutors = numExecutors
    self.args = args
    self.conf = conf
    self.timeout = timeout
    self.poke_interval = 10

def execute(self, context):
    """
    Executes the task
    """

    payload = {
        "name": self.name,
        "className": self.className,
        "executorMemory": self.executorMemory,
        "driverMemory": self.driverMemory,
        "driverCores": self.driverCores,
        "executorCores": self.executorCores,
        "numExecutors": self.numExecutors,
        "file": self.file,
        "args": self.args,
        "conf": self.conf
    }
    print payload
    headers = {
        'X-Requested-By': 'airflow',
        'Content-Type': 'application/json'
    }

    http = HttpHook(self.method, http_conn_id=self.http_conn_id)

    self.log.info("Submitting batch through Apache Livy API")

    response = http.run(self.endpoint,
                        json.dumps(payload),
                        headers,
                        self.extra_options)

    # parse the JSON response
    obj = json.loads(response.content)

    # get the new batch Id
    self.batch_id = obj['id']

    log.info('Batch successfully submitted with Id %s', self.batch_id)

    # start polling the batch status
    started_at = datetime.utcnow()
    while not self.poke(context):
        if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
            raise AirflowSensorTimeout('Snap. Time is OUT.')

        sleep(self.poke_interval)

    self.log.info("Batch %s has finished", self.batch_id)

def poke(self, context):
    '''
    Function that the sensors defined while deriving this class should
    override.
    '''

    http = HttpHook(method='GET', http_conn_id=self.http_conn_id)

    self.log.info("Calling Apache Livy API to get batch status")

    # call the API endpoint
    endpoint = 'batches/' + str(self.batch_id)
    response = http.run(endpoint)

    # parse the JSON response
    obj = json.loads(response.content)

    # get the current state of the batch
    state = obj['state']

    # check the batch state
    if (state == 'starting') or (state == 'running'):
        # if state is 'starting' or 'running'
        # signal a new polling cycle
        self.log.info('Batch %s has not finished yet (%s)',
                      self.batch_id, state)
        return False
    elif state == 'success':
        # if state is 'success' exit
        return True
    else:
        # for all other states
        # raise an exception and
        # terminate the task
        raise AirflowException(
            'Batch ' + str(self.batch_id) + ' failed (' + state + ')')

Надеюсь, это поможет вам немного.

Другие вопросы по тегам