Пользовательский разделитель Spark для СДР путей S3
У меня есть RDD[(Long, String)]
дорожек S3 (ведро + ключ) с их размерами. Я хочу разделить его так, чтобы каждый раздел получал пути, размеры которых суммируются примерно до одного и того же значения. Таким образом, когда я читаю контент для этих путей, каждый раздел должен иметь примерно одинаковый объем данных для работы. Я написал этот пользовательский разделитель для этого.
import org.apache.spark.Partitioner
import scala.collection.mutable.PriorityQueue
class S3Partitioner(partitions: Int, val totalSize: Long) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
require(totalSize >= 0, s"Number of totalSize ($totalSize) cannot be negative.")
val pq = PriorityQueue[(Int, Long)]()
(0 until partitions).foreach { partition =>
pq.enqueue((partition, totalSize / partitions))
}
def getPartition(key: Any): Int = key match {
case k: Long =>
val (partition, capacityLeft) = pq.dequeue
pq.enqueue((partition, capacityLeft - k))
partition
case _ => 0
}
def numPartitions: Int = partitions
override def equals(other: Any): Boolean = other match {
case p: S3Partitioner =>
p.totalSize == totalSize && p.numPartitions == numPartitions
case _ => false
}
override def hashCode: Int = {
(972 * numPartitions.hashCode) ^ (792 * totalSize.hashCode)
}
}
Предполагается, что разделитель будет работать лучше, если он получает RDD с ключами (размерами), отсортированными в порядке убывания. Когда я попытался использовать его, я начал получать эту ошибку в коде, который работал раньше:
Cause: java.io.NotSerializableException: scala.collection.mutable.PriorityQueue$ResizableArrayAccess
Вот как я использую это:
val pathsWithSize: RDD[(Long, String)] = ...
val totalSize = pathsWithSize.map(_._1).reduce(_ + _)
new PairRDDFunctions(pathsWithSize)
.partitionBy(new S3Partitioner(5 * sc.defaultParallelism, totalSize))
.mapPartitions { iter =>
iter.map { case (_, path) => readS3(path) }
}
И я не уверен, как это исправить. Буду признателен за любую помощь.