Python-код для балок с поддержкой hdfs в конвейере
Я запускаю пример Sentiment для преобразования тензорного потока. https://github.com/tensorflow/transform/blob/master/examples/sentiment_example.py
Для fn ReadAndShuffleData(), определенной в строке 78-98, возможно ли, что аналогичным образом я могу загружать файлы, но из HDFS, а не из GCS?
Я пробовал целый день с несколькими API лучей (beams-2.8.0), но потерпел неудачу, и я думаю, что наиболее перспективным является использование beams.io.hadoopfilesystem. Но этот fn фактически создает файл-объект python и не может быть прочитан с использованием beams.io.ReadFromText() в конвейере луча.
Я также передал в HadoopFileSystemPipelineOptions правильно. Кто-нибудь может показать мне направление для решения проблемы или 2/3-строчные фрагменты кода или обходной путь? Большое спасибо!
ps hadoop 2.7.7, лучи 2.8 и данные загружены правильно.
Я думаю, что мне может не хватить некоторого теоретического понимания здесь, любые ссылки будут оценены!
0 ответов
Вы можете использовать apache_beam.Create
преобразование:
Начальная подпись: beam.Create (self, values, reshuffle = True)
Docstring: преобразование, которое создает PCollection из итерируемого.
import apache_beam as beam
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.io.hadoopfilesystem import HadoopFileSystem
HDFS_HOSTNAME = 'foo.hadoop.com'
HDFS_PORT = 50070
hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, hdfs_port=HDFS_PORT, hdfs_user="foobar")
hdfs_client = HadoopFileSystem(hdfs_client_options)
input_file_hdfs = "hdfs://foo/bar.csv"
f = hdfs_client.open(input_file_hdfs)
p = beam.Pipeline(options=PipelineOptions())
lines = p | 'ReadMyFile' >> beam.Create(f)
res = lines | "WriteMyFile" >> beam.io.WriteToText("./bar", ".csv")
p.run()