Переключение с Луиджи на поток воздуха
У меня есть относительно простая задача, которая начинается с запуска файлов объемом 1,2 млн. И создания конвейера для каждого из них (с несколькими этапами, на которых сохраняются промежуточные продукты). Я реализовал это в luigi: https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f. Мне нравится, что Луиджи использует файловую систему, чтобы увидеть, была ли выполнена задача или нет. Я также нашел реализацию, где я могу удалить промежуточный продукт, и конвейер воссоздает все зависимые продукты (так что я могу изменить конвейер). Как бы я сделал это в потоке воздуха (или, может быть, я должен придерживаться Луиджи?)?
1 ответ
Я действительно не знаю, как работает Луиджи. Я использую в основном Apache Airflow. Airflow - это система управления рабочим процессом. Это означает, что он не передает данные, не трансформирует их и не генерирует некоторые из них (хотя он создает журналы и существует концепция, называемая Xcom
который позволяет обмениваться сообщениями между задачами, позволяя более нюансированные формы контроля и общего состояния.), например. Апач Нифи. Но он определяет зависимости каждой задачи, которую вы создаете, используя Operators
например BashOperator
, Чтобы узнать, выполнена ли задача, он проверяет сигнал, возвращенный той же задачей.
Следуя примеру того, что вы хотите сделать, реализовано в Airflow.
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import glob
import gzip
import shutil
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dag', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
def extract_gzs():
for filename in glob.glob('/1002/*.gz')
with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
extractGZ = PythonOperator(
task_id='extract_gz',
provide_context=True,
python_callable=extract_gzs(),
dag=dag)
cmd_cmd="""
your sed script!
"""
sed_script = BashOperator(
task_id='sed_script',
bash_command=cmd_cmd,
dag=dag)
extractGZ.set_downstream(sed_script)
- Импортируйте операторы, которые вы хотите использовать в Airflow (и, конечно, если вам нужны другие классы / библиотеки)
- Определите свой Dag. Здесь в переменной
args
я определилowner
иstart_date
параметр. - Затем создайте экземпляр вашего DAG. Здесь я назвал его example_dag, приписал его определение переменной, schedule_interval и по истечении которого время должно истечь (есть еще много параметров для использования в соответствии с вашими потребностями)
- создал функцию Python extract_gzs()
- создатель
PythonOperator
где я называю свой Python Func - Делать то же самое с кодом Bash
- Определение зависимостей между двумя экземплярами задачи
Конечно, есть еще много способов реализовать ту же идею. Адаптировать в соответствии с потребностями! PS: здесь есть несколько примеров с Apache Airflow