Эффективный метод копирования в Hadoop

Существует ли более быстрый или более эффективный способ копирования файлов через HDFS, кроме distcp, Я попробовал как обычный hadoop fs -cp так же как distcp и оба, кажется, дают одинаковую скорость передачи, около 50 Мбит / с.

У меня есть 5 ТБ данных, разбитых на более мелкие файлы по 500 ГБ каждый, которые я должен скопировать в новое место на HDFS. Какие-нибудь мысли?

Редактировать: оригинал distcp порождает только 1 маппер, поэтому я добавил -m100 возможность увеличения картографов

hadoop distcp -D mapred.job.name="Gigafiles distcp" -pb -i -m100 "/user/abc/file1" "/xyz/aaa/file1"

Но все же это порождает только 1, а не 100 картостроителей. Я что-то здесь упускаю?

1 ответ

Я придумал это, если вы хотите скопировать подмножество файлов из одной папки в другую в HDFS. Это может быть не так эффективно, какdistcpно выполняет свою работу и дает вам больше свободы на случай, если вы захотите выполнить другие операции. Он также проверяет, существует ли там каждый файл:

import pandas as pd
import os
from multiprocessing import Process
from subprocess import Popen, PIPE
hdfs_path_1 = '/path/to/the/origin/'
hdfs_path_2 = '/path/to/the/destination/'
process = Popen(f'hdfs dfs -ls -h {hdfs_path_2}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
already_processed = [fn.split()[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
print(f'Total number of ALREADY PROCESSED tar files = {len(already_processed)}')

df = pd.read_csv("list_of_files.csv")  # or any other lists that you have
to_do_tar_list = list(df.tar)
to_do_list = set(to_do_tar_list) - set(already_processed)
print(f'To go: {len(to_do_list)}')

def copyy(f):
    process = Popen(f'hdfs dfs -cp {hdfs_path_1}{f} {hdfs_path_2}', shell=True, stdout=PIPE, stderr=PIPE)
    std_out, std_err = process.communicate()
    if std_out!= b'':
        print(std_out)

ps = []
for f in to_do_list:
    p = Process(target=copyy, args=(f,))
    p.start()
    ps.append(p)
for p in ps:
    p.join()
print('done')

Также, если вы хотите получить список всех файлов в каталоге, используйте это:

from subprocess import Popen, PIPE
hdfs_path = '/path/to/the/designated/folder'
process = Popen(f'hdfs dfs -ls -h {hdfs_path}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
list_of_file_names = [fn.split(' ')[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
list_of_file_names_with_full_address = [fn.split(' ')[-1] for fn in std_out.decode().readlines()[1:]][:-1]

Я смог решить эту проблему с помощью сценария Pig для чтения данных из пути A, преобразования в паркет (что в любом случае является желаемым форматом хранения) и записи его в путь B. Процесс занимал в среднем около 20 минут для файлов объемом 500 ГБ., Спасибо за предложения.

Другие вопросы по тегам