Пример кода, необходимого для Luigi - модуль HdfsTarget

Я новичок в Луиджи.

У меня есть пример кода ниже, но он дает мне ошибку, когда я выполняю его, так как у нас есть python3, мы используем объект HdfsTarget для подключения hdfs на namenode 1, я выполняю этот код с моей локальной машины.

import os import luigi import luigi.contrib.hdfs

from luigi.contrib import webhdfs

class TestWebHdfs(luigi.Task):

    This test requires a running Hadoop cluster with WebHdfs enabled
    This test requires the luigi.cfg file to have a `hdfs` section
    with the namenode_host, namenode_port and user settings.

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget("tmp/words.txt")

    def run(self):

        words = [

        with self.output().open('w') as f:
            for word in words:

         if __name__ == '__main__':

У меня есть моя конфигурация luigi.cgf

[core] default-scheduler-host=name-node-1 default-scheduler-port=8082 default-scheduler-url=http://name-node-1:8082/luigi/ hdfs-tmp-dir=/tmp log_level=DEBUG

[hdfs] snakebite_autoconfig=False namenode_host=name-node-1 namenode_port=50070 effective_user=admin client=hadoopcli

[hadoop] command=/usr/hdp/


**DEBUG: Checking if TestWebHdfs() is complete
DEBUG: Running file existence check: /usr/hdp/ fs -stat tmp/words.txt
WARNING: Will not run TestWebHdfs() or any dependencies due to error in complete() method:
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 401, in check_complete
    is_complete = task.complete()
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/luigi/task.py", line 573, in complete
    return all(map(lambda output: output.exists(), outputs))
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/luigi/task.py", line 573, in <lambda>
    return all(map(lambda output: output.exists(), outputs))
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/luigi/target.py", line 243, in exists
    return self.fs.exists(path)
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/luigi/contrib/hdfs/hadoopcli_clients.py", line 78, in exists
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True)
  File "/home/ubuntu/anaconda3/lib/python3.7/subprocess.py", line 769, in __init__
    restore_signals, start_new_session)
  File "/home/ubuntu/anaconda3/lib/python3.7/subprocess.py", line 1516, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: '/usr/hdp/': '/usr/hdp/'
INFO: Informed scheduler that task   TestWebHdfs__99914b932b   has status   UNKNOWN
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done**

0 ответов

Другие вопросы по тегам