Apache airflow запускает операторов, использующих kubernetes в AWS
Я оцениваю воздушный поток Apache для производственного использования в среде данных и хотел бы знать, можно ли с помощью воздушного потока запускать операторов в автономных средах докеров в автоматически масштабируемом кластере Kubernetes.
Я нашел следующий оператор: KubernetesPodOperator
которая, кажется, делает работу, но единственные примеры, которые я нашел, были на Google Cloud. Я хотел бы запустить это на AWS, однако я не нашел примеров того, как это будет сделано. Я полагаю, что AWS EKS или AWS fargate могут соответствовать всем требованиям, но не уверен.
Может кто-нибудь с опытом воздушного потока, пожалуйста, дайте мне знать, если это возможно? Я посмотрел онлайн и пока не нашел ничего ясного.
1 ответ
Мы использовали Fargate и Airflow в производстве, и на данный момент мы получили положительный опыт.
Мы использовали его для временных рабочих нагрузок, и это оказалось для нас дешевле, чем наличие выделенного кластера Kubernetes. Кроме того, нет никаких накладных расходов на управление.
Вы можете использовать операторов Apache Airflow DAG в любом облачном провайдере, а не только в GKE.
https://kubernetes.io/blog/2018/06/28/airflow-on-kubernetes-part-1-a-different-kind-of-operator/, а также в статьях об Airflow Kubernetes Operator приводятся основные примеры использования DAG.
Также в статье об исследовании Airflow KubernetesExecutor на AWS и Kops приведено хорошее объяснение с примером использования airflow-dags
а также airflow-logs
громкость на AWS.
Пример:
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime
import time
import os
args = {
'owner': 'airflow',
"start_date": datetime(2018, 10, 4),
}
dag = DAG(
dag_id='test_kubernetes_executor',
default_args=args,
schedule_interval=None
)
def print_stuff():
print("Hi Airflow")
for i in range(2):
one_task = PythonOperator(
task_id="one_task" + str(i),
python_callable=print_stuff,
dag=dag
)
second_task = PythonOperator(
task_id="two_task" + str(i),
python_callable=print_stuff,
dag=dag
)
third_task = PythonOperator(
task_id="third_task" + str(i),
python_callable=print_stuff,
dag=dag
)
one_task >> second_task >> third_task