Проблемы с памятью при запуске задания Spark на относительно большом входе

Я управляю искровым кластером с 50 машинами. Каждая машина представляет собой виртуальную машину с 8-ядерным процессором и 50 ГБ памяти (41, кажется, доступен для Spark).

Я работаю на нескольких папках ввода, я оцениваю размер сжатого файла ~250 ГБ.

Хотя мне кажется, что количество и конфигурация компьютеров, которые я использую, кажется достаточной, после примерно 40 минут выполнения задания я могу увидеть следующие ошибки в журналах:

2558733 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 345.0 in stage 1.0 (TID 345, hadoop-w-3.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: Java heap space

java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
java.lang.StringCoding.decode(StringCoding.java:193)
java.lang.String.<init>(String.java:416)
java.lang.String.<init>(String.java:481)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

а также:

2653545 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 122.1 in stage 1.0 (TID 392, hadoop-w-22.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: GC overhead limit exceeded

java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
java.lang.StringCoding.decode(StringCoding.java:193)
java.lang.String.<init>(String.java:416)
java.lang.String.<init>(String.java:481)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Как мне отладить такую ​​проблему?

РЕДАКТИРОВАТЬ: Я нашел основную причину проблемы. Вот этот кусок кода:

    private static final int MAX_FILE_SIZE = 40194304;
    ....
    ....
        JavaPairRDD<String, List<String>> typedData = filePaths.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, List<String>>() {
            @Override
            public Iterable<Tuple2<String, List<String>>> call(Iterator<String> filesIterator) throws Exception {
                List<Tuple2<String, List<String>>> res = new ArrayList<>();
                String fileType = null;
                List<String> linesList = null;
                if (filesIterator != null) {
                    while (filesIterator.hasNext()) {
                        try {
                            Path file = new Path(filesIterator.next());
                            // filter non-trc files
                            if (!file.getName().startsWith("1")) {
                                continue;
                            }
                            fileType = getType(file.getName());
                            Configuration conf = new Configuration();
                            CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                            CompressionCodec codec = compressionCodecs.getCodec(file);
                            FileSystem fs = file.getFileSystem(conf);
                            ContentSummary contentSummary = fs.getContentSummary(file);
                            long fileSize = contentSummary.getLength();
                            InputStream in = fs.open(file);
                            if (codec != null) {
                                in = codec.createInputStream(in);
                            } else {
                                throw new IOException();
                            }

                            byte[] buffer = new byte[MAX_FILE_SIZE];

                            BufferedInputStream bis = new BufferedInputStream(in, BUFFER_SIZE);
                            int count = 0;
                            int bytesRead = 0;
                            try {
                                while ((bytesRead = bis.read(buffer, count, BUFFER_SIZE)) != -1) {
                                    count += bytesRead;
                                }
                            } catch (Exception e) {
                                log.error("Error reading file: " + file.getName() + ", trying to read " + BUFFER_SIZE + " bytes at offset: " + count);
                                throw e;
                            }

                            Iterable<String> lines = Splitter.on("\n").split(new String(buffer, "UTF-8").trim());
                            linesList = Lists.newArrayList(lines);

                            // get rid of first line in file

                            Iterator<String> it = linesList.iterator();
                            if (it.hasNext()) {
                                it.next();
                                it.remove();
                            }
                            //res.add(new Tuple2<>(fileType,linesList));
                        } finally {
                            res.add(new Tuple2<>(fileType, linesList));
                        }


                    }

                }
                return res;
            }

В частности, выделение буфера размером 40 МБ для каждого файла для чтения содержимого файла с использованием BufferedInputStream. Это приводит к тому, что в какой-то момент память стека заканчивается.

Дело в том:

  • Если я буду читать построчно (что не требует буфера), это будет очень неэффективное чтение
  • Если я выделю один буфер и повторно использую его для каждого чтения файла - возможно ли это в смысле параллелизма? Или это будет перезаписано несколькими потоками?

Любые предложения приветствуются...

РЕДАКТИРОВАТЬ 2: Исправлена ​​первая проблема с памятью, перемещая распределение байтового массива за пределы итератора, поэтому он снова используется всеми элементами раздела. Но есть все еще новая строка (буфер, "UTF-8"). Trim()), которая создается для цели разделения - это объект, который также создается каждый раз. Я мог бы использовать stringbuffer/builder, но тогда как мне установить кодировку charset без объекта String?

1 ответ

В конце концов я изменил код следующим образом:

       // Transform list of files to list of all files' content in lines grouped by type
        JavaPairRDD<String,List<String>> typedData = filePaths.mapToPair(new PairFunction<String, String, List<String>>() {
            @Override
            public Tuple2<String, List<String>> call(String filePath) throws Exception {
                Tuple2<String, List<String>> tuple = null;
                try {
                    String fileType = null;
                    List<String> linesList = new ArrayList<String>();
                    Configuration conf = new Configuration();
                    CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                    Path path = new Path(filePath);
                    fileType = getType(path.getName());
                    tuple = new Tuple2<String, List<String>>(fileType, linesList);

                    // filter non-trc files
                    if (!path.getName().startsWith("1")) {
                        return tuple;
                    }

                    CompressionCodec codec = compressionCodecs.getCodec(path);
                    FileSystem fs = path.getFileSystem(conf);
                    InputStream in = fs.open(path);
                    if (codec != null) {
                        in = codec.createInputStream(in);
                    } else {
                        throw new IOException();
                    }

                    BufferedReader r = new BufferedReader(new InputStreamReader(in, "UTF-8"), BUFFER_SIZE);
                    // Get rid of the first line in the file
                    r.readLine();

                    // Read all lines
                    String line;
                    while ((line = r.readLine()) != null) {
                        linesList.add(line);
                    }
                } catch (IOException e) { // Filtering of files whose reading went wrong
                    log.error("Reading of the file " + filePath + " went wrong: " + e.getMessage());
                } finally {
                    return tuple;
                }
            }

        });

Так что теперь я не использую буфер размером 40M, а скорее строю список строк динамически, используя список массивов. Это решило мою текущую проблему с памятью, но теперь я получил другие странные ошибки, которые не сработали. Сообщите о них в другом вопросе...

Другие вопросы по тегам