Ошибка воздушного потока при импорте DAG с помощью плагина - отношения могут быть установлены только между операторами

Я написал плагин airflow, который просто содержит один пользовательский оператор (для поддержки CMEK в BigQuery). Я могу создать простую группу обеспечения доступности баз данных с одной задачей, в которой используется этот оператор и которая прекрасно работает.

Однако, если я пытаюсь создать зависимость в группе обеспечения доступности баз данных от задачи DummyOperator до задачи пользовательского оператора, DAG не удается загрузить в пользовательском интерфейсе и выдает следующую ошибку, и я не могу понять, почему эта ошибка возникает?

Broken DAG: [/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py] Отношения могут быть установлены только между операторами; получил BQCMEKOperator

До сих пор я проверял это на composer-1.4.2-airflow-1.9.0, composer-1.4.2-airflow-1.10.0 и composer-1.4.1-airflow-1.10.0.

Запуск теста воздушного потока для каждой из задач завершается без ошибок.

Использование его изолированно в DAG работает нормально (как показано ниже), поэтому я не верю, что с плагином что-то не так по своей сути

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator


default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}


dag = DAG(
    'js_bq_custom_plugin',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

run_this = BQCMEKOperator(
    task_id     = 'cmek_plugin_test',
    sql         = 'select * from ds.foo LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test10',
    key         = 'xxx',
    dag     = dag
)

Принимая во внимание, что если я ввожу DummyOperator и зависимость, то возникает ошибка

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from airflow.operators.dummy_operator import DummyOperator

default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}

dag = DAG(
    'js_bq_custom_plugin_v2',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

etl_start = DummyOperator(task_id='etl_start', dag=dag)

extract = BQCMEKOperator(
    task_id     = 'extract',
    sql         = 'select * from foo.bar LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test5',
    key         = 'xxx',
    dag         = dag
)

etl_start.set_downstream(extract)

Сам оператор прост, и я могу воспроизвести проблему с помощью самого простого пользовательского оператора, такого как приведенный ниже.

import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class TestOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                *args,
                **kwargs):
        super(TestOperator, self).__init__(*args, **kwargs)


    def execute(self, context):
        logging.info("Executed by TestOperator")

Со следующим определением плагина в init.py

from airflow.plugins_manager import AirflowPlugin
from test_plugin.operators.test_operator import TestOperator

class TestPlugin(AirflowPlugin):
    name = "test_plugin"
    operators = [TestOperator]
    hooks = []
    executors = []
    macros = []
    admin_views = []
    flask_blueprints = []
    menu_links = []

Также, посмотрев на код воздушного потока в models.py, который генерирует эту ошибку, он использует isinstance(t, BaseOperator), и он возвращает true для моей задачи, используя мой пользовательский оператор, когда я просто запускаю его в python, поэтому я понятия не имею, что происходит?

for t in task_list:
    if not isinstance(t, BaseOperator):
        raise AirflowException(
            "Relationships can only be set between "
            "Operators; received {}".format(t.__class__.__name__))

1 ответ

Решение

В выпуске composer-1.4.2 была обнаружена ошибка, которую мы уже исправили, попробуйте создать новую среду Composer, и ошибка DAG должна исчезнуть. Между тем, мы также применим это исправление автоматически к существующим средам 1.4.2 в течение следующих нескольких дней.

Другие вопросы по тегам