Почему использование памяти моего исполнителя зависло на 0?
У меня довольно простая работа Spark, которая выглядит так:
JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc =
rawData.filter(new IndexFilter()).cache();
JavaPairRDD<Key,Value> indexEntries =
indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries =
indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries =
rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.union(indexEntries)
.union(reverseIndexEntries)
.repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
.saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, conf);
Где Key и Value - классы Apache Accumulo Key и Value (с использованием KryoSerializer).
Я не уверен, куда именно поместить вызовы cache(), или даже если они вообще нужны. Но я обеспокоен тем, что мои исполнители, кажется, не используют большую часть памяти, которую я им выделил:
А страница "Хранилище" в интерфейсе приложения пуста.
Я делаю что-то не так, или Spark решил, что не сможет ускорить эту работу, храня мои RDD?
1 ответ
Используемая память означает память, используемую для кэширования.
В вашем коде вы выполняете только одно действие, и indexSrc или dataEntries больше не используются, поэтому нет смысла его кэшировать.
Чтобы доказать это, вы можете добавить
indexSrc.count();
а также dataEntries.count();
после их объявления, а затем проверьте страницу исполнителя / хранилища.
JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc = rawData.filter(new IndexFilter()).cache();
indexSrc.count();
JavaPairRDD<Key,Value> indexEntries = indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries = indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries = rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.count();
dataEntries.union(indexEntries)
.union(reverseIndexEntries)
.repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
.saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, conf);