Python-код для балок с поддержкой hdfs в конвейере

Я запускаю пример Sentiment для преобразования тензорного потока. https://github.com/tensorflow/transform/blob/master/examples/sentiment_example.py

Для fn ReadAndShuffleData(), определенной в строке 78-98, возможно ли, что аналогичным образом я могу загружать файлы, но из HDFS, а не из GCS?

Я пробовал целый день с несколькими API лучей (beams-2.8.0), но потерпел неудачу, и я думаю, что наиболее перспективным является использование beams.io.hadoopfilesystem. Но этот fn фактически создает файл-объект python и не может быть прочитан с использованием beams.io.ReadFromText() в конвейере луча.

Я также передал в HadoopFileSystemPipelineOptions правильно. Кто-нибудь может показать мне направление для решения проблемы или 2/3-строчные фрагменты кода или обходной путь? Большое спасибо!

ps hadoop 2.7.7, лучи 2.8 и данные загружены правильно.

Я думаю, что мне может не хватить некоторого теоретического понимания здесь, любые ссылки будут оценены!

0 ответов

Вы можете использовать apache_beam.Create преобразование:

Начальная подпись: beam.Create (self, values, reshuffle = True)

Docstring: преобразование, которое создает PCollection из итерируемого.

import apache_beam as beam
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.io.hadoopfilesystem import HadoopFileSystem

HDFS_HOSTNAME = 'foo.hadoop.com'
HDFS_PORT = 50070
hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, hdfs_port=HDFS_PORT, hdfs_user="foobar")
hdfs_client = HadoopFileSystem(hdfs_client_options)

input_file_hdfs = "hdfs://foo/bar.csv"
f = hdfs_client.open(input_file_hdfs)

p = beam.Pipeline(options=PipelineOptions())
lines = p | 'ReadMyFile' >> beam.Create(f)
res = lines | "WriteMyFile" >> beam.io.WriteToText("./bar", ".csv")
p.run()
Другие вопросы по тегам