Как сохранить массив NumPy из рабочего PySpark в HDFS или общей файловой системе?

Я хотел бы сохранять / читать массивные массивы с / на рабочие машины (функция) в HDFS эффективно в PySpark. У меня есть две машины А и Б. А есть мастер и рабочий. У Б есть один работник. Например, я хотел бы достичь чего-то, как показано ниже:

if __name__ == "__main__":
    conf = SparkConf().setMaster("local").setAppName("Test")
    sc = SparkContext(conf = conf)
    sc.parallelize([0,1,2,3], 2).foreachPartition(func)

def func(iterator):
    P = << LOAD from HDFS or Shared Memory as numpy array>>
    for x in iterator:
        P = P + x

    << SAVE P (numpy array) to HDFS/ shared file system >>

Что может быть быстрым и эффективным методом для этого?

1 ответ

Я наткнулся на ту же проблему. и наконец использовал обходной путь, используя модуль HdfsCli и временные файлы с Python3.4.

  1. импорт:
from hdfs import InsecureClient
from tempfile import TemporaryFile
  1. создать клиента hdfs. В большинстве случаев лучше иметь вспомогательную функцию где-нибудь в вашем скрипте, например:
def get_hdfs_client():
    return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
         root="<hdfs base path>")
  1. Загрузите и сохраните ваш NumPy внутри рабочей функции:
hdfs_client = get_hdfs_client()

# load from file.npy
path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()

with hdfs_client.read(path) as reader:
    tf.write(reader.read())
    tf.seek(0) # important, set cursor to beginning of file

np_array = numpy.load(tf)

...

# save to file.npy
tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file
# with overwrite=False, an exception is thrown if the file already exists
hdfs_client.write("/whatever/output/file.npy", tf.read(),  overwrite=True) 

Заметки:

  • URI, используемый для создания клиента hdfs, начинается с http://потому что он использует веб-интерфейс файловой системы hdfs;
  • убедитесь, что пользователь, которого вы передаете клиенту hdfs, имеет права на чтение и запись
  • по моему опыту, накладные расходы не значительны (по крайней мере, с точки зрения времени выполнения)
  • преимущество использования временных файлов (по сравнению с обычными файлами в /tmp), что вы гарантируете, что файлы мусора не останутся на компьютерах кластера после завершения сценария, нормально или нет
Другие вопросы по тегам