Apache airflow запускает операторов, использующих kubernetes в AWS

Я оцениваю воздушный поток Apache для производственного использования в среде данных и хотел бы знать, можно ли с помощью воздушного потока запускать операторов в автономных средах докеров в автоматически масштабируемом кластере Kubernetes.

Я нашел следующий оператор: KubernetesPodOperator которая, кажется, делает работу, но единственные примеры, которые я нашел, были на Google Cloud. Я хотел бы запустить это на AWS, однако я не нашел примеров того, как это будет сделано. Я полагаю, что AWS EKS или AWS fargate могут соответствовать всем требованиям, но не уверен.

Может кто-нибудь с опытом воздушного потока, пожалуйста, дайте мне знать, если это возможно? Я посмотрел онлайн и пока не нашел ничего ясного.

1 ответ

Мы использовали Fargate и Airflow в производстве, и на данный момент мы получили положительный опыт.

Мы использовали его для временных рабочих нагрузок, и это оказалось для нас дешевле, чем наличие выделенного кластера Kubernetes. Кроме того, нет никаких накладных расходов на управление.

Github - Airflow DAG с ECSOperatorConfig

Вы можете использовать операторов Apache Airflow DAG в любом облачном провайдере, а не только в GKE.

https://kubernetes.io/blog/2018/06/28/airflow-on-kubernetes-part-1-a-different-kind-of-operator/, а также в статьях об Airflow Kubernetes Operator приводятся основные примеры использования DAG.

Также в статье об исследовании Airflow KubernetesExecutor на AWS и Kops приведено хорошее объяснение с примером использования airflow-dags а также airflow-logs громкость на AWS.

Пример:

from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime
import time
import os

args = {
    'owner': 'airflow',
    "start_date": datetime(2018, 10, 4),
}

dag = DAG(
    dag_id='test_kubernetes_executor',
    default_args=args,
    schedule_interval=None
)

def print_stuff():
    print("Hi Airflow")

for i in range(2):
    one_task = PythonOperator(
        task_id="one_task" + str(i),
        python_callable=print_stuff,
        dag=dag
    )

    second_task = PythonOperator(
        task_id="two_task" + str(i),
        python_callable=print_stuff,
        dag=dag
    )

    third_task = PythonOperator(
        task_id="third_task" + str(i),
        python_callable=print_stuff,
        dag=dag
    )

    one_task >> second_task >> third_task
Другие вопросы по тегам