Переключение с Луиджи на поток воздуха

У меня есть относительно простая задача, которая начинается с запуска файлов объемом 1,2 млн. И создания конвейера для каждого из них (с несколькими этапами, на которых сохраняются промежуточные продукты). Я реализовал это в luigi: https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f. Мне нравится, что Луиджи использует файловую систему, чтобы увидеть, была ли выполнена задача или нет. Я также нашел реализацию, где я могу удалить промежуточный продукт, и конвейер воссоздает все зависимые продукты (так что я могу изменить конвейер). Как бы я сделал это в потоке воздуха (или, может быть, я должен придерживаться Луиджи?)?

1 ответ

Решение

Я действительно не знаю, как работает Луиджи. Я использую в основном Apache Airflow. Airflow - это система управления рабочим процессом. Это означает, что он не передает данные, не трансформирует их и не генерирует некоторые из них (хотя он создает журналы и существует концепция, называемая Xcom который позволяет обмениваться сообщениями между задачами, позволяя более нюансированные формы контроля и общего состояния.), например. Апач Нифи. Но он определяет зависимости каждой задачи, которую вы создаете, используя Operatorsнапример BashOperator, Чтобы узнать, выполнена ли задача, он проверяет сигнал, возвращенный той же задачей.

Следуя примеру того, что вы хотите сделать, реализовано в Airflow.

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import glob
import gzip
import shutil

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_dag', default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60))


def extract_gzs():
    for filename in glob.glob('/1002/*.gz')
        with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)


extractGZ = PythonOperator(
    task_id='extract_gz',
    provide_context=True,
    python_callable=extract_gzs(),
dag=dag)


cmd_cmd="""
your sed script!
"""

sed_script = BashOperator(
    task_id='sed_script', 
    bash_command=cmd_cmd, 
    dag=dag)


extractGZ.set_downstream(sed_script)
  1. Импортируйте операторы, которые вы хотите использовать в Airflow (и, конечно, если вам нужны другие классы / библиотеки)
  2. Определите свой Dag. Здесь в переменной args я определил owner и start_date параметр.
  3. Затем создайте экземпляр вашего DAG. Здесь я назвал его example_dag, приписал его определение переменной, schedule_interval и по истечении которого время должно истечь (есть еще много параметров для использования в соответствии с вашими потребностями)
  4. создал функцию Python extract_gzs()
  5. создатель PythonOperator где я называю свой Python Func
  6. Делать то же самое с кодом Bash
  7. Определение зависимостей между двумя экземплярами задачи

Конечно, есть еще много способов реализовать ту же идею. Адаптировать в соответствии с потребностями! PS: здесь есть несколько примеров с Apache Airflow

Другие вопросы по тегам