Почему использование памяти моего исполнителя зависло на 0?

У меня довольно простая работа Spark, которая выглядит так:

JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc =
    rawData.filter(new IndexFilter()).cache();
JavaPairRDD<Key,Value> indexEntries =
    indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries =
    indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries =
    rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();

dataEntries.union(indexEntries)
  .union(reverseIndexEntries)
  .repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
  .saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
      AccumuloFileOutputFormat.class, conf);

Где Key и Value - классы Apache Accumulo Key и Value (с использованием KryoSerializer).

Я не уверен, куда именно поместить вызовы cache(), или даже если они вообще нужны. Но я обеспокоен тем, что мои исполнители, кажется, не используют большую часть памяти, которую я им выделил:

Снимок экрана, показывающий использование нулевой памяти

А страница "Хранилище" в интерфейсе приложения пуста.

Я делаю что-то не так, или Spark решил, что не сможет ускорить эту работу, храня мои RDD?

1 ответ

Решение

Используемая память означает память, используемую для кэширования.

В вашем коде вы выполняете только одно действие, и indexSrc или dataEntries больше не используются, поэтому нет смысла его кэшировать.

Чтобы доказать это, вы можете добавить

indexSrc.count(); а также dataEntries.count(); после их объявления, а затем проверьте страницу исполнителя / хранилища.

JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc = rawData.filter(new IndexFilter()).cache();
indexSrc.count();
JavaPairRDD<Key,Value> indexEntries = indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries = indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries = rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.count();

dataEntries.union(indexEntries)
  .union(reverseIndexEntries)
  .repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
  .saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
      AccumuloFileOutputFormat.class, conf);
Другие вопросы по тегам