Лучший способ записать огромное количество файлов
Я пишу много файлов, как показано ниже.
public void call(Iterator<Tuple2<Text, BytesWritable>> arg0)
throws Exception {
// TODO Auto-generated method stub
while (arg0.hasNext()) {
Tuple2<Text, BytesWritable> tuple2 = arg0.next();
System.out.println(tuple2._1().toString());
PrintWriter writer = new PrintWriter("/home/suv/junk/sparkOutPut/"+tuple2._1().toString(), "UTF-8");
writer.println(new String(tuple2._2().getBytes()));
writer.close();
}
}
Есть ли лучший способ для записи файлов.. без закрытия или создания печатающего устройства каждый раз.
2 ответа
Нет лучшего способа записи большого количества файлов. То, что вы делаете, по сути является интенсивным вводом / выводом.
ОБНОВЛЕНИЕ - @ Майкл Андерсон прав, я думаю. Использование нескольких потоков для записи файлов (вероятно) значительно ускорит процесс. Тем не менее, ввод-вывод по-прежнему будет самым узким местом в нескольких отношениях:
Создание, открытие и закрытие файлов включает доступ и обновление метаданных файлов и каталогов. Это влечет за собой нетривиальный процессор.
Данные файла и изменения метаданных должны быть записаны на диск. Это возможно несколько записей на диск.
Для каждого записанного файла есть как минимум 3 системных вызова.
Тогда есть нити, накладывающие накладные расходы.
Если количество данных, записываемых в каждый файл, не является значительным (несколько килобайт на файл), я сомневаюсь, что такие методы, как использование NIO, прямые буферы, JNI и т. Д., Будут полезны. Реальные узкие места будут в ядре: операции с файловой системой и низкоуровневый дисковый ввод-вывод.
... не закрывая и не создавая принтер каждый раз.
Вам нужно создать новый PrintWriter
(или же Writer
или же OutputStream
) для каждого файла.
Тем не менее, это...
writer.println(new String(tuple2._2().getBytes()));
... выглядит довольно своеобразно. Вы кажетесь:
- призвание
getBytes()
наString
(?), - преобразование байтового массива в
String
- вызывая
println()
метод наString
который скопирует это, и преобразует это обратно в байты прежде, чем наконец вывести их.
Что дает? Какой смысл преобразования String -> bytes -> String?
Я бы просто сделал это:
writer.println(tuple2._2());
Это должно быть быстрее, хотя я бы не ожидал, что процентное ускорение будет таким большим.
Я предполагаю, что вы выбрали самый быстрый способ. Потому что все знают, что самое быстрое - это лучшее;)
Один простой способ - это использовать кучу потоков, чтобы писать за вас. Однако вы не получите большой выгоды от этого, если ваша файловая система не будет хорошо масштабироваться. (Я использую эту технику в кластерных системах на основе Luster, и в случаях, когда "много файлов" может означать 10 тыс. - в этом случае многие записи будут выполняться на разных серверах / дисках)
Код будет выглядеть примерно так: (Обратите внимание, я думаю, что эта версия не подходит, так как для небольшого количества файлов она заполняет рабочую очередь - но в любом случае посмотрите следующую версию для лучшей версии...)
public void call(Iterator<Tuple2<Text, BytesWritable>> arg0) throws Exception {
int nThreads=5;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
int nJobs = 0;
while (arg0.hasNext()) {
++nJobs;
final Tuple2<Text, BytesWritable> tuple2 = arg0.next();
ecs.submit(new Callable<Void>() {
@Override Void call() {
System.out.println(tuple2._1().toString());
String path = "/home/suv/junk/sparkOutPut/"+tuple2._1().toString();
try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
writer.println(new String(tuple2._2().getBytes()))
}
return null;
}
});
}
for(int i=0; i<nJobs; ++i) {
ecs.take().get();
}
}
Еще лучше начать писать свои файлы, как только у вас есть данные для первого, а не когда у вас есть данные для всех из них - и чтобы это письмо не блокировало поток (ы) вычисления.
Для этого вы разбиваете свое приложение на несколько частей, взаимодействующих через (потокобезопасную) очередь.
Код в конечном итоге выглядит примерно так:
public void main() {
SomeMultithreadedQueue<Data> queue = ...;
int nGeneratorThreads=1;
int nWriterThreads=5;
int nThreads = nGeneratorThreads + nWriterThreads;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
AtomicInteger completedGenerators = new AtomicInteger(0);
// Start some generator threads.
for(int i=0; ++i; i<nGeneratorThreads) {
ecs.submit( () -> {
while(...) {
Data d = ... ;
queue.push(d);
}
if(completedGenerators.incrementAndGet()==nGeneratorThreads) {
queue.push(null);
}
return null;
});
}
// Start some writer threads
for(int i=0; i<nWriterThreads; ++i) {
ecs.submit( () -> {
Data d
while((d = queue.take())!=null) {
String path = data.path();
try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
writer.println(new String(data.getBytes()));
}
return null;
}
});
}
for(int i=0; i<nThreads; ++i) {
ecs.take().get();
}
}
Примечание. Я не предоставил реализацию класса очереди, вы можете легко обернуть стандартные Java-потоки, чтобы получить то, что вам нужно.
Еще многое можно сделать, чтобы уменьшить задержку и т. Д. - вот некоторые другие вещи, которые я использовал, чтобы сократить время...
даже не ждите, пока все данные будут сгенерированы для данного файла. Передайте другую очередь, содержащую пакеты байтов для записи.
Следите за распределением ресурсов - вы можете повторно использовать некоторые из своих буферов.
В nio есть некоторая задержка - вы можете получить некоторые улучшения производительности, используя C-записи, JNI и прямые буферы.
Переключение потоков может повредить, и задержка в очередях может повредить, так что вы можете захотеть немного сгруппировать данные. Сбалансировать это с 1 может быть сложно.