Как использовать поток воздуха для обработки данных в реальном времени
У меня есть сценарий, где я хочу обработать CSV-файл и загрузить в другую базу данных:
случаи
- Pic CSV-файл и загрузить в MySQL с тем же именем, что и CSV
- затем внесите некоторые изменения в загруженные строки, используя файл задачи Python
- после этого извлеките данные из mysql и загрузите в другую базу данных
Файлы CSV поступают с удаленного сервера на один сервер воздушного потока в папке.
Мы должны выбрать этот CSV-файл и обработать его через скрипт Python.
Предположим, я выбрал один CSV-файл, а затем мне нужно передать этот CSV-файл оператору в зависимости, как
filename : abc.csv
task1 >> task2 >> task3 >>task4
Таким образом, abc.csv должен быть доступен для всех задач.
Пожалуйста, расскажите, как поступить.
1 ответ
Ваши сценарии не имеют ничего общего с реальным временем. Это глотание по расписанию / интервалу. Или, возможно, вы могли бы использовать SensorTask Operator для обнаружения доступности данных.
Реализуйте каждое из ваших требований в виде функций и вызывайте их из экземпляров операторов. Добавьте операторов в группу доступности базы данных с расписанием, соответствующим вашему входящему каналу.
Вы передаете и получаете доступ к params -kwargs python_callable при запуске оператора -context['param_key'] в методе execute при расширении шаблонов оператора -jinja
релевантно... параметр передачи воздушного потока из cli execute_date в воздушном потоке: необходимо получить доступ как переменную
Способ взаимодействия задач в Airflow использует XCOM, но он предназначен для небольших значений, а не для содержимого файла.
Если вы хотите, чтобы ваши задачи работали с тем же CSV-файлом, вы должны сохранить его в каком-то месте, а затем передать в XCOM путь к этому местоположению.
Мы используем LocalExecutor, поэтому для нас подходит локальная файловая система.
Мы решили создать папку для каждого ярлыка с названием. Внутри этой папки мы создаем папку для каждой даты выполнения (мы делаем это в первой задаче, которую мы всегда вызываем start_task
). Затем мы передаем путь к этой папке для последующих задач через Xcom.
Пример кода для start_task:
def start(share_path, **context):
execution_date_as_string = context['execution_date'].strftime(DATE_FORMAT)
execution_folder_path = os.path.join(share_path, 'my_dag_name', execution_date_as_string)
_create_folder_delete_if_exists(execution_folder_path)
task_instance = context['task_instance']
task_instance.xcom_push(key="execution_folder_path", value=execution_folder_path)
start_task = PythonOperator(
task_id='start_task',
provide_context=True,
python_callable=start,
op_args=[share_path],
dag=dag
)
share_path
является базовым каталогом для всех пакетов, мы храним его в переменных Airflow.
Последующие задачи могут получить папку выполнения с:
execution_folder_path = task_instance.xcom_pull(task_ids='start_task', key='execution_folder_path')