SPARK: отключить большой RDD после того, как другой RDD будет полностью в памяти

Я хочу отключить RDD (RDD1) после того, как другой RDD (RDD2) полностью в памяти. У меня есть следующий код:

val RDD2 = MyProcessor.process(RDD1).persist(StorageLevel.MEMORY_ONLY_SER_2).
  setName("RDD2")

if (RDD2.count > 0) {
  RDD1.unpersist()
}

Тем не менее, RDD2.count становится очень медленным с большим набором данных. Есть ли лучший способ гарантировать, что RDD1 не будет работать после того, как RDD2 будет полностью в памяти?

Спасибо!

2 ответа

Подсчет RDD2 в вашем случае - очень дорогая операция. Вы не можете напрямую проверить, полностью ли загружен RDD2 в основную память. Сначала вы должны выполнить действие (как вы это сделали) на RDD2, а затем Spark сохранит RDD2 для последующих итераций. Однако вышеприведенный подход не совсем эффективен, так как вам нужно выполнить полное сканирование RDD2, и если RDD1 и RDD2 вместе находятся близко к превышению вашей доступной памяти, тогда ваше время выполнения будет намного медленнее.

Два альтернативных предложения, чтобы проверить, больше ли размер RDD2, чем 0:

  1. проверьте, не является ли СДР пустым (!RDD.isEmpty)
  2. проверьте, существует ли хотя бы один элемент (!RDD.take(1)==0 или!RDD.first()==0)

Вы кешируете RDD1 исключительно для ускорения MyProcessor.process(RDD1)? Выполняете ли вы какие-либо преобразования и действия на RDD1 после кэширования и перед вызовом MyProcessor.process(RDD1)? Это вещи, которые вы, возможно, придется посетить, потому что cache ленив, так что если в MyProcessor.process(RDD1) тогда вы выполняете только одно действие на RDD1 unpersist потом, тогда я не думаю, что вы сможете извлечь максимальную пользу из кеширования.

Другие вопросы по тегам