Как использовать поток воздуха для обработки данных в реальном времени

У меня есть сценарий, где я хочу обработать CSV-файл и загрузить в другую базу данных:

случаи

  1. Pic CSV-файл и загрузить в MySQL с тем же именем, что и CSV
  2. затем внесите некоторые изменения в загруженные строки, используя файл задачи Python
  3. после этого извлеките данные из mysql и загрузите в другую базу данных

Файлы CSV поступают с удаленного сервера на один сервер воздушного потока в папке.

Мы должны выбрать этот CSV-файл и обработать его через скрипт Python.

Предположим, я выбрал один CSV-файл, а затем мне нужно передать этот CSV-файл оператору в зависимости, как

filename : abc.csv

task1 >> task2 >> task3 >>task4

Таким образом, abc.csv должен быть доступен для всех задач.

Пожалуйста, расскажите, как поступить.

1 ответ

Ваши сценарии не имеют ничего общего с реальным временем. Это глотание по расписанию / интервалу. Или, возможно, вы могли бы использовать SensorTask Operator для обнаружения доступности данных.

Реализуйте каждое из ваших требований в виде функций и вызывайте их из экземпляров операторов. Добавьте операторов в группу доступности базы данных с расписанием, соответствующим вашему входящему каналу.

Вы передаете и получаете доступ к params -kwargs python_callable при запуске оператора -context['param_key'] в методе execute при расширении шаблонов оператора -jinja

релевантно... параметр передачи воздушного потока из cli execute_date в воздушном потоке: необходимо получить доступ как переменную

Способ взаимодействия задач в Airflow использует XCOM, но он предназначен для небольших значений, а не для содержимого файла.

Если вы хотите, чтобы ваши задачи работали с тем же CSV-файлом, вы должны сохранить его в каком-то месте, а затем передать в XCOM путь к этому местоположению.

Мы используем LocalExecutor, поэтому для нас подходит локальная файловая система.

Мы решили создать папку для каждого ярлыка с названием. Внутри этой папки мы создаем папку для каждой даты выполнения (мы делаем это в первой задаче, которую мы всегда вызываем start_task). Затем мы передаем путь к этой папке для последующих задач через Xcom.

Пример кода для start_task:

def start(share_path, **context):
    execution_date_as_string = context['execution_date'].strftime(DATE_FORMAT)    
    execution_folder_path = os.path.join(share_path, 'my_dag_name', execution_date_as_string)
    _create_folder_delete_if_exists(execution_folder_path)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="execution_folder_path", value=execution_folder_path)

start_task = PythonOperator(
    task_id='start_task',
    provide_context=True,
    python_callable=start,
    op_args=[share_path],
    dag=dag
)

share_path является базовым каталогом для всех пакетов, мы храним его в переменных Airflow.

Последующие задачи могут получить папку выполнения с:

execution_folder_path = task_instance.xcom_pull(task_ids='start_task', key='execution_folder_path')
Другие вопросы по тегам