Как сохранить массив NumPy из рабочего PySpark в HDFS или общей файловой системе?
Я хотел бы сохранять / читать массивные массивы с / на рабочие машины (функция) в HDFS эффективно в PySpark. У меня есть две машины А и Б. А есть мастер и рабочий. У Б есть один работник. Например, я хотел бы достичь чего-то, как показано ниже:
if __name__ == "__main__":
conf = SparkConf().setMaster("local").setAppName("Test")
sc = SparkContext(conf = conf)
sc.parallelize([0,1,2,3], 2).foreachPartition(func)
def func(iterator):
P = << LOAD from HDFS or Shared Memory as numpy array>>
for x in iterator:
P = P + x
<< SAVE P (numpy array) to HDFS/ shared file system >>
Что может быть быстрым и эффективным методом для этого?
1 ответ
Я наткнулся на ту же проблему. и наконец использовал обходной путь, используя модуль HdfsCli и временные файлы с Python3.4.
- импорт:
from hdfs import InsecureClient
from tempfile import TemporaryFile
- создать клиента hdfs. В большинстве случаев лучше иметь вспомогательную функцию где-нибудь в вашем скрипте, например:
def get_hdfs_client():
return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
root="<hdfs base path>")
- Загрузите и сохраните ваш NumPy внутри рабочей функции:
hdfs_client = get_hdfs_client()
# load from file.npy
path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()
with hdfs_client.read(path) as reader:
tf.write(reader.read())
tf.seek(0) # important, set cursor to beginning of file
np_array = numpy.load(tf)
...
# save to file.npy
tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file
# with overwrite=False, an exception is thrown if the file already exists
hdfs_client.write("/whatever/output/file.npy", tf.read(), overwrite=True)
Заметки:
- URI, используемый для создания клиента hdfs, начинается с
http://
потому что он использует веб-интерфейс файловой системы hdfs; - убедитесь, что пользователь, которого вы передаете клиенту hdfs, имеет права на чтение и запись
- по моему опыту, накладные расходы не значительны (по крайней мере, с точки зрения времени выполнения)
- преимущество использования временных файлов (по сравнению с обычными файлами в
/tmp
), что вы гарантируете, что файлы мусора не останутся на компьютерах кластера после завершения сценария, нормально или нет