Команда bash, в то время как цикл запускается только один раз в kubernetespodoperator Airflow

Я пытаюсь запустить одно задание pod Kubernetes 6 раз. Каждый раз он будет печатать число и спать в течение 5 секунд. Однако он запускается только один раз, а затем останавливается. Вот полный код файла dag:

      from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "lab",
    "depends_on_past": False,
    "start_date": days_ago(0),
    "catchup": False,
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "dag_executes_6_times",
    default_args=default_args,
    max_active_runs=1,
    concurrency=10,
)
# use a kube_config stored in s3 dags folder for now
kube_config_path = "/usr/local/airflow/dags/kube_config.yaml"

# Generate 2 tasks
tasks = ["task{}".format(i) for i in range(1, 3)]
example_dag_complete_node = DummyOperator(task_id="example_dag_complete", dag=dag)

org_dags = []
for task in tasks:

    bash_command = "echo HELLO"

    org_node = KubernetesPodOperator(
        namespace="default-airflow",
        image="bash",
        cmds=["bash", "-c"],
        arguments=[
            "bash",
            "-c",
            "i=0;while true;do echo '$i' && ((i++>5)) && break && sleep 5;done",
        ],
        labels={"foo": "bar"},
        image_pull_policy="Always",
        name=task,
        task_id=task,
        is_delete_operator_pod=False,
        get_logs=True,
        dag=dag,
        config_file=kube_config_path,
        in_cluster=False,
        cluster_context="lab",
    )

    org_node.set_downstream(example_dag_complete_node)

Это будет выглядеть следующим образом:

По сути, я хочу, чтобы статус отображался как запущенный для всего процесса, а не только один раз. Подскажите и заранее спасибо!

1 ответ

Цикл выполняется 6 раз, он просто не спит, поэтому сразу же завершается. Проблема в том, что внутри цикла:

      echo '$i' && ((i++>5)) && break && sleep 5

В sleepкоманда связана с остальной частью командной строки с помощью &&, что означает, что он будет работать только в том случае, если остальная часть командной строки будет выполнена успешно. ((i++>5))и команда (которая в любом случае предотвращает ее запуск). Вы хотите связать его с ;вместо этого, чтобы он выполнялся безоговорочно (хотя breakпропустит его на последней итерации). Кроме того, поскольку '$i'находится в одинарных кавычках, оно не будет заменено значением переменной, просто напечатано буквально; вы хотите двойные кавычки. Вот что внутри whileцикл должен выглядеть так:

      echo "$i" && ((i++>5)) && break; sleep 5

(Примечание: вам придется соответствующим образом экранировать эти двойные кавычки.)

Другие вопросы по тегам