Эффективный метод копирования в Hadoop
Существует ли более быстрый или более эффективный способ копирования файлов через HDFS, кроме distcp
, Я попробовал как обычный hadoop fs -cp
так же как distcp
и оба, кажется, дают одинаковую скорость передачи, около 50 Мбит / с.
У меня есть 5 ТБ данных, разбитых на более мелкие файлы по 500 ГБ каждый, которые я должен скопировать в новое место на HDFS. Какие-нибудь мысли?
Редактировать: оригинал distcp
порождает только 1 маппер, поэтому я добавил -m100
возможность увеличения картографов
hadoop distcp -D mapred.job.name="Gigafiles distcp" -pb -i -m100 "/user/abc/file1" "/xyz/aaa/file1"
Но все же это порождает только 1, а не 100 картостроителей. Я что-то здесь упускаю?
1 ответ
Я придумал это, если вы хотите скопировать подмножество файлов из одной папки в другую в HDFS. Это может быть не так эффективно, какdistcp
но выполняет свою работу и дает вам больше свободы на случай, если вы захотите выполнить другие операции. Он также проверяет, существует ли там каждый файл:
import pandas as pd
import os
from multiprocessing import Process
from subprocess import Popen, PIPE
hdfs_path_1 = '/path/to/the/origin/'
hdfs_path_2 = '/path/to/the/destination/'
process = Popen(f'hdfs dfs -ls -h {hdfs_path_2}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
already_processed = [fn.split()[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
print(f'Total number of ALREADY PROCESSED tar files = {len(already_processed)}')
df = pd.read_csv("list_of_files.csv") # or any other lists that you have
to_do_tar_list = list(df.tar)
to_do_list = set(to_do_tar_list) - set(already_processed)
print(f'To go: {len(to_do_list)}')
def copyy(f):
process = Popen(f'hdfs dfs -cp {hdfs_path_1}{f} {hdfs_path_2}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
if std_out!= b'':
print(std_out)
ps = []
for f in to_do_list:
p = Process(target=copyy, args=(f,))
p.start()
ps.append(p)
for p in ps:
p.join()
print('done')
Также, если вы хотите получить список всех файлов в каталоге, используйте это:
from subprocess import Popen, PIPE
hdfs_path = '/path/to/the/designated/folder'
process = Popen(f'hdfs dfs -ls -h {hdfs_path}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
list_of_file_names = [fn.split(' ')[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
list_of_file_names_with_full_address = [fn.split(' ')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
Я смог решить эту проблему с помощью сценария Pig для чтения данных из пути A, преобразования в паркет (что в любом случае является желаемым форматом хранения) и записи его в путь B. Процесс занимал в среднем около 20 минут для файлов объемом 500 ГБ., Спасибо за предложения.