Динамические задачи воздушного потока во время выполнения
Другие вопросы о "динамических задачах", по-видимому, касаются динамического построения DAG во время графика или проектирования. Я заинтересован в динамическом добавлении задач в группу доступности базы данных во время выполнения.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('test_dag', description='a test',
schedule_interval='0 0 * * *',
start_date=datetime(2018, 1, 1),
catchup=False)
def make_tasks():
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1 >> du2 >> du3
p = PythonOperator(
task_id='python_operator',
dag=dag,
python_callable=make_tasks)
Эта наивная реализация, похоже, не работает - фиктивные задачи никогда не отображаются в пользовательском интерфейсе.
Как правильно добавить новые операторы в группу доступности базы данных во время выполнения? Является ли это возможным?
1 ответ
Это не возможно изменить DAG во время его выполнения (без большого количества работы).
dag = DAG(...
подхватывается в цикле планировщиком. Это будет иметь экземпляр задачи 'python_operator'
в этом. Этот экземпляр задачи планируется по расписанию и выполняется работником или исполнителем. Поскольку модели DAG в базе данных Airflow обновляются только планировщиком, эти добавленные фиктивные задачи не будут сохранены в DAG и не запланированы для выполнения. Они будут забыты, когда рабочий выйдет. Если вы не скопируете весь код из планировщика относительно сохранения и обновления модели… но это будет отменено в следующий раз, когда планировщик посещает файл DAG для синтаксического анализа, что может происходить раз в минуту, раз в секунду или быстрее, в зависимости от количества других Файлы DAG есть для разбора.
Airflow на самом деле хочет, чтобы каждая группа DAG оставалась примерно одинаковой между выполнениями. Он также хочет постоянно перезагружать / анализировать файлы DAG. Таким образом, хотя вы могли бы создать файл DAG, который при каждом запуске определяет задачи динамически на основе некоторых внешних данных (предпочтительно кэшируемых в модуле файла или pyc, а не сетевого ввода-вывода, как поиск в БД, вы замедляете весь цикл планирования для всех групп обеспечения доступности баз данных) это не очень хороший план, поскольку ваш график и древовидная структура запутаются, и ваш поиск в планировщике будет более обременительным.
Вы можете заставить вызываемый запускать каждую задачу...
def make_tasks(context):
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1.execute(context)
du2.execute(context)
du3.execute(context)
p = PythonOperator(
provides_context=true,
Но это последовательно, и вы должны решить, как использовать Python, чтобы сделать их параллельными (использовать фьючерсы?), И, если возникнет исключение, вся задача не будет выполнена. Также он привязан к одному исполнителю или работнику, поэтому не использует распределение задач airflow (kubernetes, mesos, celery).
Другой способ работы с этим - добавить фиксированное количество задач (максимальное число) и использовать вызываемые элементы для короткого замыкания ненужных задач или выдвигать аргументы с помощью xcom для каждой из них, изменяя их поведение во время выполнения. но не меняя DAG.
Что касается примера кода, вы никогда не вызываете свою функцию, которая регистрирует ваши задачи в вашей группе DAG.
Для выполнения динамических задач у вас может быть один оператор, который делает что-то другое в зависимости от состояния, или у вас может быть несколько операторов, которые могут быть пропущены в зависимости от состояния с помощью ShortCircuitOperator.
Я ценю всю проделанную здесь работу, так как передо мной стоит та же задача по созданию динамически структурированных групп DAG. Я сделал достаточно ошибок, чтобы не использовать программное обеспечение против его дизайна. Если я не могу проверить весь прогон в пользовательском интерфейсе и увеличивать или уменьшать масштаб, в основном используйте функции воздушного потока, которые являются основной причиной, по которой я все равно использую его. Я могу просто написать многопроцессорный код внутри функции и тоже покончить с этим.
Все сказанное, мое решение - использовать диспетчер ресурсов, например блокировку redis, и иметь DAG, который записывает в этот диспетчер ресурсов данные о том, что запускать, как запускать и т. Д.; и иметь другой DAG или DAG, которые запускаются через определенные интервалы времени, опрашивая диспетчер ресурсов, блокируя их перед запуском и удаляя их по завершении. Таким образом, по крайней мере, я использую воздушный поток, как ожидалось, хотя его характеристики не совсем соответствуют моим потребностям. Я разбиваю проблему на более определенные части. Решения креативны, но противоречат дизайну и не проверены разработчиками. В частности, говорят о фиксированных структурированных рабочих процессах. Я не могу обойти код, который не протестирован и противоречит дизайну, если я не перепишу основной код воздушного потока и не протестирую себя.Я понимаю, что мое решение сопряжено со сложностями с блокировкой и прочим, но, по крайней мере, я знаю границы этого.