Датчик потока воздуха для определения файлов на моем локальном диске

У кого-нибудь есть идеи по FileSensor? Я прошел через это, когда я искал файлы на моем локальном каталоге. Код выглядит следующим образом:

  task= FileSensor(
    task_id="senseFile"
    filepath="etc/hosts",
    fs_conn_id='fs_local',
   _hook=self.hook,
    dag=self.dag,)

Я также установил мои conn_id и тип conn как File (путь) и дал {'path':'mypath'}, но даже если я установил несуществующий путь или если файл не указан в указанном пути, задача выполнена и даг успешен. FileSensor, кажется, не чувствует файлы вообще.

1 ответ

Решение

Я нашел, что сообщество внесло свой вклад в FileSenor, но написал свой собственный.

Я установил, что он работает для файлов локально, там, где работал сервер / планировщик, однако при использовании сетевых путей возникли проблемы.

Уловка для сетевых путей, которую я нашел, состояла в том, чтобы подключить сетевой диск к моей Linux Box.

Это мой DAG, используемый для sensor_task >> proccess_task >> archive_task >> триггер перезапуска

Примечание: мы используем переменные (sourcePath, filePattern & archivePath), введенные через WebGUI

    from airflow import DAG
    from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator, TriggerDagRunOperator
    from datetime import datetime, timedelta
    from airflow.models import Variable

    default_args = {
        'owner': 'glsam',
        'depends_on_past': False,
        'start_date': datetime(2017, 6, 26),
        'provide_context': True,
        'retries': 100,
        'retry_delay': timedelta(seconds=30)
    }

    task_name = 'my_first_file_sensor_task'
    filepath = Variable.get("soucePath")
    filepattern = Variable.get("filePattern")
    archivepath = Variable.get("archivePath")

    dag = DAG(
        'task_name',
        default_args=default_args,
        schedule_interval=None,
        catchup=False,
        max_active_runs=1,
        concurrency=1)

    sensor_task = OmegaFileSensor(
        task_id=task_name,
        filepath=filepath,
        filepattern=filepattern,
        poke_interval=3,
        dag=dag)


    def process_file(**context):
        file_to_process = context['task_instance'].xcom_pull(
            key='file_name', task_ids=task_name)
        file = open(filepath + file_to_process, 'w')
        file.write('This is a test\n')
        file.write('of processing the file')
        file.close()


    proccess_task = PythonOperator(
        task_id='process_the_file', python_callable=process_file, dag=dag)

    archive_task = ArchiveFileOperator(
        task_id='archive_file',
        filepath=filepath,
        task_name=task_name,
        archivepath=archivepath,
        dag=dag)

    trigger = TriggerDagRunOperator(
        task_id='trigger_dag_rerun', trigger_dag_id=task_name, dag=dag)

    sensor_task >> proccess_task >> archive_task >> trigger

И тогда это мой FileSenor

    import os
    import re

    from datetime import datetime
    from airflow.models import BaseOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.decorators import apply_defaults
    from airflow.operators.sensors import BaseSensorOperator


    class ArchiveFileOperator(BaseOperator):
        @apply_defaults
        def __init__(self, filepath, archivepath, task_name, *args, **kwargs):
            super(ArchiveFileOperator, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.archivepath = archivepath
            self.task_name = task_name

        def execute(self, context):
            file_name = context['task_instance'].xcom_pull(self.task_name, key='file_name')
            os.rename(self.filepath + file_name, self.archivepath + file_name)


    class OmegaFileSensor(BaseSensorOperator):
        @apply_defaults
        def __init__(self, filepath, filepattern, *args, **kwargs):
            super(OmegaFileSensor, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.filepattern = filepattern

        def poke(self, context):
            full_path = self.filepath
            file_pattern = re.compile(self.filepattern)

            directory = os.listdir(full_path)

            for files in directory:
                if not re.match(file_pattern, files):
                    # do nothing
                else:
                    context['task_instance'].xcom_push('file_name', files)
                    return True
            return False

    class OmegaPlugin(AirflowPlugin):
        name = "omega_plugin"
        operators = [OmegaFileSensor, ArchiveFileOperator]

Полный отказ от ответственности: я новичок в Python, так что это может быть довольно уродливый код, но он работает

Другие вопросы по тегам