Airflow PigOperator выдает ошибку "Нет такого файла или каталога"

Не могу найти проблему, которую я сделал, журналы показаны ниже. Созданные мной сценарии DAG, connection, pig также показаны ниже.

ДАГ:

from airflow.operators import BashOperator, PigOperator
from airflow.models import DAG
from datetime import datetime

default_args = {
    'owner': 'hadoop',
    'start_date': datetime.now()
    }
dag = DAG(dag_id='ETL-DEMO',default_args=default_args,schedule_interval='@hourly')

fly_task_1 = BashOperator(
            task_id='fly_task_1',
            bash_command='sleep 10 ; echo "fly_task_2"',
            dag=dag)
fly_task_2 = PigOperator(
            task_id='fly_task_2',
            pig='/pig/sample.pig',
            pig_cli_conn_id='pig_cli',
            dag=dag)

fly_task_2.set_upstream(fly_task_1)

Свинья сценарий:

rmf /onlyvinish/sample_out;
a_load = load '/onlyvinish/sample.txt' using PigStorage(',');
a_gen = foreach a_load generate (int)$0 as a;
b_gen = foreach a_gen generate a, a+1, a+2, a+3, a+4, a+5;
store b_gen into '/onlyvinish/sample_out' using PigStorage(',');

Соединения: введите описание изображения здесь

Журнал для невыполненной задачи:

[2017-01-24 00:03:27,199] {models.py:168} INFO - Filling up the DagBag from /home/hadoop/airflow/dags/ETL.py
[2017-01-24 00:03:27,276] {jobs.py:2042} INFO - Subprocess PID is 8532
[2017-01-24 00:03:29,410] {models.py:168} INFO - Filling up the DagBag from /home/hadoop/airflow/dags/ETL.py
[2017-01-24 00:03:29,487] {models.py:1078} INFO - Dependencies all met for <TaskInstance: ETL-DEMO.fly_task_2 2017-01-24 00:03:07.199790 [queued]>
[2017-01-24 00:03:29,496] {models.py:1078} INFO - Dependencies all met for <TaskInstance: ETL-DEMO.fly_task_2 2017-01-24 00:03:07.199790 [queued]>
[2017-01-24 00:03:29,496] {models.py:1266} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2017-01-24 00:03:29,533] {models.py:1289} INFO - Executing <Task(PigOperator): fly_task_2> on 2017-01-24 00:03:07.199790
[2017-01-24 00:03:29,550] {pig_operator.py:64} INFO - Executing: rmf /onlyvinish/sample_out;

a_load = load '/onlyvinish/sample.txt' using PigStorage(',');

a_gen = foreach a_load generate (int)$0 as a;
b_gen = foreach a_gen generate a, a+1, a+2, a+3, a+4, a+5;

store b_gen into '/onlyvinish/sample_out' using PigStorage(',');
[2017-01-24 00:03:29,612] {pig_hook.py:67} INFO - pig -f /tmp/airflow_pigop_sm5bjE/tmpNP0ZXM
[2017-01-24 00:03:29,620] {models.py:1364} ERROR - [Errno 2] No such file or directory
Traceback (most recent call last):
  File "/home/hadoop/anaconda2/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py", line 1321, in run
    result = task_copy.execute(context=context)
  File "/home/hadoop/anaconda2/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/operators/pig_operator.py", line 66, in execute
    self.hook.run_cli(pig=self.pig)
  File "/home/hadoop/anaconda2/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/hooks/pig_hook.py", line 72, in run_cli
    cwd=tmp_dir)
  File "/home/hadoop/anaconda2/lib/python2.7/subprocess.py", line 711, in __init__
    errread, errwrite)
  File "/home/hadoop/anaconda2/lib/python2.7/subprocess.py", line 1343, in _execute_child
    raise child_exception
OSError: [Errno 2] No such file or directory
[2017-01-24 00:03:29,623] {models.py:1388} INFO - Marking task as FAILED.
[2017-01-24 00:03:29,636] {models.py:1409} ERROR - [Errno 2] No such file or directory
  • Воздушный поток: 1.7.2
  • Python: 2,7
  • RHEL:6,7

Пожалуйста, дайте мне знать, что я делаю не так.

0 ответов

Скрипт pig должен представлять собой шаблонную строку, а не путь к самому скрипту pig.

Другие вопросы по тегам