Воздушный поток DAG не идет по расписанию
Я новичок в Airflow и создал свой первый DAG. Вот мой код DAG. Я хочу, чтобы DAG запускался сейчас, а затем запускался один раз в день.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['aaaa@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'alamode', default_args=default_args, schedule_interval=timedelta(1))
create_command = "/home/ubuntu/scripts/makedir.sh "
# t1 is the task which will invoke the directory creation shell script
t1 = BashOperator(
task_id='create_directory',
bash_command=create_command,
dag=dag)
run_spiders = "/home/ubuntu/scripts/crawl_spiders.sh "
# t2 is the task which will invoke the spiders
t2 = BashOperator(
task_id='web_scrawl',
bash_command=run_spiders,
dag=dag)
# To set dependency between tasks. 't1' should run before t2
t2.set_upstream(t1)
DAG не выбирается Airflow. Я проверил журнал и вот что он говорит.
[2017-09-12 18:08:20,220] {jobs.py:343} DagFileProcessor398 INFO - Started process (PID=7001) to work on /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,223] {jobs.py:1521} DagFileProcessor398 INFO - Processing file /home/ubuntu/airflow/dags/alamode.py for tasks to queue
[2017-09-12 18:08:20,223] {models.py:167} DagFileProcessor398 INFO - Filling up the DagBag from /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,262] {jobs.py:1535} DagFileProcessor398 INFO - DAG(s) ['alamode'] retrieved from /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,291] {jobs.py:1169} DagFileProcessor398 INFO - Processing alamode
/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate. Consider alternative strategies for improved performance.
'strategies for improved performance.' % expr)
[2017-09-12 18:08:20,317] {models.py:322} DagFileProcessor398 INFO - Finding 'running' jobs without a recent heartbeat
[2017-09-12 18:08:20,318] {models.py:328} DagFileProcessor398 INFO - Failing jobs without heartbeat after 2017-09-12 18:03:20.318105
[2017-09-12 18:08:20,320] {jobs.py:351} DagFileProcessor398 INFO - Processing /home/ubuntu/airflow/dags/alamode.py took 0.100 seconds
Что именно я делаю не так? Я попытался изменить schedule_interval на schedule_interval=timedelta(minutes=1), чтобы увидеть, запускается ли он немедленно, но все равно бесполезно. Я вижу задачи под DAG, как и ожидалось, в пользовательском интерфейсе Airflow, но со статусом расписания "Нет статуса". Пожалуйста, помогите мне здесь.
1 ответ
Эта проблема была решена с помощью следующих шагов:
1) Я использовал гораздо более старую дату для start_date и schedule_interval=timedelta(minutes=10). Кроме того, вместо datetime.now() используется реальная дата.
2) Добавлен catchup = True в аргументах DAG.
3) Настройте переменную среды как экспорт AIRFLOW_HOME =pwd
/ Airflow_home.
4) Удален airflow.db
5) Перенес новый код в папку DAGS
6) Запустите команду 'airflow initdb', чтобы снова создать БД.
7) Включил переключатель "ВКЛ" моего DAG через интерфейс
8) Запустил команду "Планировщик воздушного потока"
Вот код, который работает сейчас:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 9, 12),
'email': ['anjana@gapro.tech'],
'retries': 0,
'retry_delay': timedelta(minutes=15)
}
dag = DAG(
'alamode', catchup=False, default_args=default_args, schedule_interval="@daily")
# t1 is the task which will invoke the directory creation shell script
t1 = BashOperator(
task_id='create_directory',
bash_command='/home/ubuntu/scripts/makedir.sh ',
dag=dag)
# t2 is the task which will invoke the spiders
t2 = BashOperator(
task_id= 'web_crawl',
bash_command='/home/ubuntu/scripts/crawl_spiders.sh ',
dag=dag)
# To set dependency between tasks. 't1' should run before t2
t2.set_upstream(t1)