SPARK: отключить большой RDD после того, как другой RDD будет полностью в памяти
Я хочу отключить RDD (RDD1) после того, как другой RDD (RDD2) полностью в памяти. У меня есть следующий код:
val RDD2 = MyProcessor.process(RDD1).persist(StorageLevel.MEMORY_ONLY_SER_2).
setName("RDD2")
if (RDD2.count > 0) {
RDD1.unpersist()
}
Тем не менее, RDD2.count становится очень медленным с большим набором данных. Есть ли лучший способ гарантировать, что RDD1 не будет работать после того, как RDD2 будет полностью в памяти?
Спасибо!
2 ответа
Подсчет RDD2 в вашем случае - очень дорогая операция. Вы не можете напрямую проверить, полностью ли загружен RDD2 в основную память. Сначала вы должны выполнить действие (как вы это сделали) на RDD2, а затем Spark сохранит RDD2 для последующих итераций. Однако вышеприведенный подход не совсем эффективен, так как вам нужно выполнить полное сканирование RDD2, и если RDD1 и RDD2 вместе находятся близко к превышению вашей доступной памяти, тогда ваше время выполнения будет намного медленнее.
Два альтернативных предложения, чтобы проверить, больше ли размер RDD2, чем 0:
- проверьте, не является ли СДР пустым (!RDD.isEmpty)
- проверьте, существует ли хотя бы один элемент (!RDD.take(1)==0 или!RDD.first()==0)
Вы кешируете RDD1 исключительно для ускорения MyProcessor.process(RDD1)
? Выполняете ли вы какие-либо преобразования и действия на RDD1 после кэширования и перед вызовом MyProcessor.process(RDD1)
? Это вещи, которые вы, возможно, придется посетить, потому что cache
ленив, так что если в MyProcessor.process(RDD1)
тогда вы выполняете только одно действие на RDD1 unpersist
потом, тогда я не думаю, что вы сможете извлечь максимальную пользу из кеширования.