Описание тега apache-airflow-xcom
1
ответ
Xcom pull возвращает NameError на `ti`
Я пытаюсь вставить в xcom значение с ключом last_date в last_date_task а затем вытащить его из xcom во втором задании ga_wh_task, когда я проверю этот даг, запустив python dag.py он возвращает ошибку в этой строке, например: provide_context=True, si…
16 ноя '18 в 06:42
0
ответов
Apache Airflow получает информацию об исключении оператора в методе on_failure_callback
Привет, я новичок в Apache Airflow и использую on_failure_callback для обработки сбоя задачи. Но я хочу знать следующие вещи: Есть ли способ получить тип исключения, вызванного оператором в методе on_failure_callback? Это необходимо, потому что я хо…
10 окт '18 в 06:45
1
ответ
Ветви, созданные с помощью BranchPythonOperator, не объединяются?
Я использую BranchPythonOperator, чтобы сделать ветви в потоке воздуха. Мой вариант использования - мне нужно сделать две ветки от мейнстрима. Ветвь A (у которой мало задач) будет следовать, когда присутствует somefile.csv, в противном случае ветвь …
24 ноя '18 в 11:49
2
ответа
Импортировать переменную воздушного потока в PySpark
В последнее время я играю с Airflow и PySpark. Я видел, что Airflow имеет ряд переменных. Моя цель - проанализировать одну из этих переменных и импортировать ее в мой скрипт pySpark. До сих пор я пытался отобразить значение переменной (сработало), н…
12 дек '18 в 10:06
1
ответ
Передайте значение из одной DAG в другую DAG в Apache Airflow
У меня есть сценарий передачи имен файлов обработки между группами DAG, которые используются для загрузки моих данных. Есть ли функциональность для того же в потоке воздуха? Я надеюсь, что XCOM позволяет только передавать имена файлов между задачами…
03 янв '19 в 10:20
1
ответ
Воздушный поток, XCom и несколько task_ids
Как работает task_ids, если указано несколько задач? В этом конкретном примере кода я ожидал получить load_cycle_id_2 от обеих задач в кортеже (5555,22222), но вместо этого он вышел (None, 22222). Это почему? from airflow.models import DAG from airf…
22 дек '18 в 15:30
1
ответ
Создание нескольких задач в Airflow DAG для индивидуальной обработки
В моей DAG есть части, которые генерируют списки, которые я не могу разбить на отдельные задачи, которые будут обрабатываться по отдельности. Вот псевдо-пример: def push(**kwargs): # """Pushes an XCom without a specific target""" for n in range(10):…
25 фев '19 в 16:10
1
ответ
KeyError: 'ti' в Apache Airflow xcom
Мы пытаемся запустить простой DAG с двумя задачами, которые будут передавать данные через xcom. Файл DAG: from __future__ import print_function import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airf…
27 сен '18 в 17:00
0
ответов
Файл XCOM return.json извлекается из xcom-sidecar, а не из основного контейнера?
Я создал контейнер, который копирует файл return.json в /airflow/xcom/return.json Запустив этот контейнер на моей машине, вы сможете return.json, Однако, когда я запускаю этот контейнер в KubernetesPodOperator, он выдает ошибку file does not exist: …
24 янв '19 в 15:06
1
ответ
Доступ к родительскому контексту dag во время создания подтега в потоке воздуха?
Я пытаюсь получить доступ во время создания subdag к некоторым данным xcom из родительского dag, я искал, чтобы добиться этого в Интернете, но я не нашел что-то. def test(task_id): logging.info(f' execution of task {task_id}') def load_subdag(parent…
18 фев '19 в 10:49
1
ответ
Доступ к значению оператора воздушного потока вне оператора
Вне оператора мне нужно вызвать SubdagOperator и передать ему возвращаемое значение оператора, используя xcom. Я видел множество решений ( Airflow - как передать переменную xcom в функцию Python, как получить значение из Airflow XCom, отправленное ч…
17 окт '18 в 19:59
0
ответов
Датчик потока воздуха Airflow
Я пытаюсь найти, есть ли какие-либо файлы на удаленном сервере, соответствующие указанному шаблону. Что-то похожее на приведенное ниже решение Airflow File Sensor для определения файлов на моем локальном диске Я использовал SSHOperator с командой ba…
07 янв '19 в 14:47
2
ответа
Воздушный поток оператора k8s xcom - статус рукопожатия 403 запрещено
Когда я запускаю образ докера, используя KubernetesPodOperator в Airflow версии 1.10 Как только модуль успешно завершает задачу, airflow пытается получить значение xcom, установив соединение с модулем через потоковый клиент k8s. Вот ошибка, с которо…
18 дек '18 в 06:15
0
ответов
Проверьте один DAG, который делает сеть туда и обратно
У меня есть задача, которая читает список файлов из Azure и отправляет результаты в XCOM. Оператор конкретно AzureDataLakeStorageListOperator, Источник здесь: adls_list_operator.py Я хочу напечатать вывод этой задачи, используя что-то вроде BashOper…
11 дек '18 в 23:29
0
ответов
Имя файла воздушного потока из s3 с использованием s3KeySensor
Я использую Airflow, чтобы планировать свои задания на файлы с использованием AWS s3. До сих пор я смог запустить оператор python сразу после добавления файла в хранилище данных (используя s3KeySensor). Я хотел бы получить имя файла, который был доб…
14 фев '19 в 13:54
2
ответа
Воздушный поток принимает шаблон дзиндзя как строку
В Airflow я пытаюсь использовать шаблон jinja в airflow, но проблема в том, что он не анализируется, а рассматривается как строка. Пожалуйста, смотрите мой код `` from datetime import datetime from airflow.operators.python_operator import PythonOper…
26 ноя '18 в 07:33
0
ответов
Воздушный поток: передача данных в DAG через http
Я относительно новичок в воздушном потоке. Я создал DAG с простыми задачами. Из того, что я прочитал, я понял, что есть операторы для отправки запросов Http, но ни один не может получать и обрабатывать. Возможно ли это или есть какие-то обходные пут…
07 ноя '18 в 13:30
2
ответа
Не удалось извлечь xcom из модуля воздушных потоков - Kubernetes Pod Operator
Во время работы DAG, которая запускает флягу, используя образ докера,xcom_push = True, что создает еще один контейнер вместе с образом докера в одном модуле. DAG: jar_task = KubernetesPodOperator( namespace='test', image="path to image", image_pull_…
27 ноя '18 в 11:53
3
ответа
Я не могу xcom_push аргументы через BashOperator
Я новичок в Xcom функции Airflow. я попробовал это с PythonOperator, и он работал нормально (то есть я могу выдвинуть и извлечь значение из контекста), но когда я попробовал это на BashOperator, это не сработало. Однако я могу получить только послед…
11 фев '19 в 20:06
3
ответа
Воздушный поток: получить идентификатор предыдущей задачи в следующей задаче
У меня есть 2 задачи. В первом случае оператор python что-то вычисляет, а во втором я хочу использовать вывод оператора python в операторе Http. Вот мой код: source_list = ['account', 'sales'] for source_type in source_list: t2 = PythonOperator( tas…
26 май '19 в 00:33