Лучший способ записать огромное количество файлов

Я пишу много файлов, как показано ниже.

public void call(Iterator<Tuple2<Text, BytesWritable>> arg0)
        throws Exception {
    // TODO Auto-generated method stub

    while (arg0.hasNext()) {
        Tuple2<Text, BytesWritable> tuple2 = arg0.next();
        System.out.println(tuple2._1().toString());
        PrintWriter writer = new PrintWriter("/home/suv/junk/sparkOutPut/"+tuple2._1().toString(), "UTF-8");
        writer.println(new String(tuple2._2().getBytes()));
        writer.close();
    }
}

Есть ли лучший способ для записи файлов.. без закрытия или создания печатающего устройства каждый раз.

2 ответа

Решение

Нет лучшего способа записи большого количества файлов. То, что вы делаете, по сути является интенсивным вводом / выводом.

ОБНОВЛЕНИЕ - @ Майкл Андерсон прав, я думаю. Использование нескольких потоков для записи файлов (вероятно) значительно ускорит процесс. Тем не менее, ввод-вывод по-прежнему будет самым узким местом в нескольких отношениях:

  • Создание, открытие и закрытие файлов включает доступ и обновление метаданных файлов и каталогов. Это влечет за собой нетривиальный процессор.

  • Данные файла и изменения метаданных должны быть записаны на диск. Это возможно несколько записей на диск.

  • Для каждого записанного файла есть как минимум 3 системных вызова.

  • Тогда есть нити, накладывающие накладные расходы.

Если количество данных, записываемых в каждый файл, не является значительным (несколько килобайт на файл), я сомневаюсь, что такие методы, как использование NIO, прямые буферы, JNI и т. Д., Будут полезны. Реальные узкие места будут в ядре: операции с файловой системой и низкоуровневый дисковый ввод-вывод.


... не закрывая и не создавая принтер каждый раз.

Вам нужно создать новый PrintWriter (или же Writer или же OutputStream) для каждого файла.

Тем не менее, это...

  writer.println(new String(tuple2._2().getBytes()));

... выглядит довольно своеобразно. Вы кажетесь:

  • призвание getBytes() на String (?),
  • преобразование байтового массива в String
  • вызывая println() метод на String который скопирует это, и преобразует это обратно в байты прежде, чем наконец вывести их.

Что дает? Какой смысл преобразования String -> bytes -> String?

Я бы просто сделал это:

  writer.println(tuple2._2());

Это должно быть быстрее, хотя я бы не ожидал, что процентное ускорение будет таким большим.

Я предполагаю, что вы выбрали самый быстрый способ. Потому что все знают, что самое быстрое - это лучшее;)

Один простой способ - это использовать кучу потоков, чтобы писать за вас. Однако вы не получите большой выгоды от этого, если ваша файловая система не будет хорошо масштабироваться. (Я использую эту технику в кластерных системах на основе Luster, и в случаях, когда "много файлов" может означать 10 тыс. - в этом случае многие записи будут выполняться на разных серверах / дисках)

Код будет выглядеть примерно так: (Обратите внимание, я думаю, что эта версия не подходит, так как для небольшого количества файлов она заполняет рабочую очередь - но в любом случае посмотрите следующую версию для лучшей версии...)

public void call(Iterator<Tuple2<Text, BytesWritable>> arg0) throws Exception {
    int nThreads=5;
    ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
    ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);

    int nJobs = 0;

    while (arg0.hasNext()) {
        ++nJobs;
        final Tuple2<Text, BytesWritable> tuple2 = arg0.next();
        ecs.submit(new Callable<Void>() {
          @Override Void call() {
             System.out.println(tuple2._1().toString());
             String path = "/home/suv/junk/sparkOutPut/"+tuple2._1().toString();
             try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
               writer.println(new String(tuple2._2().getBytes()))
             }
             return null;
          }
       });
    }
    for(int i=0; i<nJobs; ++i) {
       ecs.take().get();
    }
}

Еще лучше начать писать свои файлы, как только у вас есть данные для первого, а не когда у вас есть данные для всех из них - и чтобы это письмо не блокировало поток (ы) вычисления.

Для этого вы разбиваете свое приложение на несколько частей, взаимодействующих через (потокобезопасную) очередь.

Код в конечном итоге выглядит примерно так:

public void main() {
  SomeMultithreadedQueue<Data> queue = ...;

  int nGeneratorThreads=1;
  int nWriterThreads=5;
  int nThreads = nGeneratorThreads + nWriterThreads;

  ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
  ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);

  AtomicInteger completedGenerators = new AtomicInteger(0);

  // Start some generator threads.
  for(int i=0; ++i; i<nGeneratorThreads) {
    ecs.submit( () -> { 
      while(...) { 
        Data d = ... ;
        queue.push(d);
      }
      if(completedGenerators.incrementAndGet()==nGeneratorThreads) {
        queue.push(null);
      }
      return null;
   });
  }

  // Start some writer threads
  for(int i=0; i<nWriterThreads; ++i) {
    ecs.submit( () -> { 
      Data d
      while((d = queue.take())!=null) {
        String path = data.path();
        try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
           writer.println(new String(data.getBytes()));
        }
        return null;
      }
    });
  }

  for(int i=0; i<nThreads; ++i) {
    ecs.take().get();
  }
}

Примечание. Я не предоставил реализацию класса очереди, вы можете легко обернуть стандартные Java-потоки, чтобы получить то, что вам нужно.

Еще многое можно сделать, чтобы уменьшить задержку и т. Д. - вот некоторые другие вещи, которые я использовал, чтобы сократить время...

  1. даже не ждите, пока все данные будут сгенерированы для данного файла. Передайте другую очередь, содержащую пакеты байтов для записи.

  2. Следите за распределением ресурсов - вы можете повторно использовать некоторые из своих буферов.

  3. В nio есть некоторая задержка - вы можете получить некоторые улучшения производительности, используя C-записи, JNI и прямые буферы.

  4. Переключение потоков может повредить, и задержка в очередях может повредить, так что вы можете захотеть немного сгруппировать данные. Сбалансировать это с 1 может быть сложно.

Другие вопросы по тегам