Воздушный поток Dagrun для каждой базы данных вместо запланированной
Текущая проблема, с которой я столкнулся, заключается в том, что у меня есть документы в коллекции MongoDB, каждый из которых должен обрабатываться и обновляться задачами, которые необходимо запускать в ациклическом графе зависимостей. Если вышестоящей задаче не удается обработать документ, то ни одна из зависимых задач не может обработать этот документ, так как этот документ не был обновлен с использованием необходимой информации.
Если бы я использовал Airflow, у меня осталось бы два решения:
Запустите DAG для каждого документа и передайте идентификатор документа с помощью
--conf
. Проблема в том, что это не тот способ использования Airflow; Я бы никогда не запустил процесс по расписанию, и, исходя из того, как документы появляются в коллекции, я бы делал 1440 дагрунов в день.Запускайте DAG каждый период для обработки всех документов, созданных в коллекции за этот период. Это следует из того, как ожидается, что Airflow будет работать, но проблема в том, что если задача не может обработать один документ, ни одна из зависимых задач не может обработать другие документы. Кроме того, если документ занимает больше времени, чем другие документы, чтобы обработать задачу, эти другие документы ожидают в этом единственном документе, чтобы продолжить вниз по группе DAG.
Есть ли способ лучше, чем Airflow? Или есть лучший способ справиться с этим в Airflow, чем два метода, которые я сейчас вижу?
5 ответов
Из знаний, которые я получил, пытаясь ответить на этот вопрос, я пришел к выводу, что Airflow - просто не инструмент для работы.
Airflow предназначен для запланированных идемпотентных групп DAG. DagRun также должен иметь уникальныйexecution_date
; это означает запуск одного и того же DAG в одно и то же время начала (в случае, если мы получаем два документа одновременно, это в буквальном смысле невозможно. Конечно, мы можем запланировать следующий DagRun сразу по очереди, но это ограничение должно продемонстрировать, что любой попытка использовать Airflow таким образом всегда будет в некоторой степени взломом.
Самым жизнеспособным решением, которое я нашел, является использование вместо этого Prefect, который был разработан с целью преодоления некоторых ограничений Airflow:
"Prefect предполагает, что потоки могут запускаться в любое время и по любой причине".
Префектом, эквивалентным DAG, является Flow; Одним из ключевых преимуществ потока, которым мы можем воспользоваться, является простота параметризации. Затем с помощью некоторых потоков мы можем запустить Flow для каждого элемента в потоке. Вот пример потокового конвейера ETL:
import time
from prefect import task, Flow, Parameter
from threading import Thread
def stream():
for x in range(10):
yield x
time.sleep(1)
@task
def extract(x):
# If 'x' referenced a document, in this step we could load that document
return x
@task
def transform(x):
return x * 2
@task
def load(y):
print("Received y: {}".format(y))
with Flow("ETL") as flow:
x_param = Parameter('x')
e = extract(x_param)
t = transform(e)
l = load(t)
for x in stream():
thread = Thread(target=flow.run, kwargs={"x": x})
thread.start()
Мы создали систему, которая запрашивает список в MongoDB и генерирует файл python для каждого элемента, содержащего один DAG (примечание: наличие у каждого dag собственного файла python помогает повысить эффективность планировщика Airflow с его текущим дизайном) - генератор DAG работает ежечасно, прямо перед запланированным почасовым запуском всех созданных DAG.
Вы можете изменить trigger_rule
от "all_success" к "all_done"
А также может создать ветку, которая обрабатывает неудачные документы с trigger_rule
установите значение "one_failed", чтобы каким-то образом переместить процессы, которые потерпели неудачу, документы (например, переместить в "неудачную" папку и отправить уведомление)
Я бы делал 1440 дагрунов в день.
При хорошей архитектуре Airflow это вполне возможно. Точки удушья могут быть
- Executor - используйте, например, Celery Executor вместо Local Executor
- серверная база данных - отслеживайте и настраивайте при необходимости (индексы, надлежащее хранилище и т. д.)
- webserver - ну, для тысяч дагрунов, задач и т. д., возможно, используйте webeserver только для сред dev/qa, а не для производства, где у вас более высокая скорость отправки задач / дагрунов. Вместо этого вы можете использовать cli и т. Д.
Другой подход - горизонтальное масштабирование путем запуска нескольких экземпляров Airflow - разделите документы, скажем, на десять сегментов, и назначьте документы каждого раздела только одному экземпляру Airflow.
Я бы обрабатывал более тяжелые задачи параллельно и скармливал успешным операциям дальше. Насколько я знаю, вы не можете асинхронно передавать успехи нижестоящим задачам, поэтому вам все равно придется ждать завершения каждого потока до перехода вниз по потоку, но это все равно было бы более приемлемо, чем порождение 1 dag для каждой записи, что-то в этих строках:
Задача 1: прочитать фильтрацию монго по некоторой временной метке (помните идемпотентность) и задачи подачи (например, через xcom);
Задача 2: делать что-то параллельно через PythonOperator или, что еще лучше, через K8sPod, то есть:
def thread_fun(ret):
while not job_queue.empty():
job = job_queue.get()
try:
ret.append(stuff_done(job))
except:
pass
job_queue.task_done()
return ret
# Create workers and queue
threads = []
ret = [] # a mutable object
job_queue = Queue(maxsize=0)
for thr_nr in appropriate_thread_nr:
worker = threading.Thread(
target=thread_fun,
args=([ret])
)
worker.setDaemon(True)
threads.append(worker)
# Populate queue with jobs
for row in xcom_pull(task_ids=upstream_task):
job_queue.put(row)
# Start threads
for thr in threads:
thr.start()
# Wait to finish their jobs
for thr in threads:
thr.join()
xcom_push(ret)
Задача 3. Делайте больше вещей из предыдущего задания и т. Д.