Параллельная обработка сбора данных, превышающих объем памяти
Есть ли простой способ использовать параллельные коллекции scala без загрузки полной коллекции в память?
Например, у меня есть большая коллекция, и я хотел бы выполнить определенную операцию (сгиб) параллельно только на небольшом фрагменте, который умещается в памяти, а не на другом фрагменте и т. Д., И, наконец, рекомбинировать результаты из всех фрагментов.
Я знаю, что актеров можно было бы использовать, но было бы неплохо использовать пар-коллекции.
Я написал решение, но это не приятно:
def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = {
new Iterator[Iterable[A]] {
var rest = list
def hasNext = !rest.isEmpty
def next = {
val chunk = rest.take(chunkSize)
rest = rest.drop(chunkSize)
chunk
}
}.toIterable
}
def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = {
val chunks: Iterable[Iterable[A]] = split(list, chunkSize)
def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) }
chunks.foldLeft(acc)(combineChunk)
}
val chunkSize = 10000000
val x = 1 to chunkSize*10
def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n }
foldPar(0)(x,chunkSize,sum)
1 ответ
Ваша идея очень аккуратная, и жаль, что такой функции уже нет (AFAIK).
Я просто перефразировал вашу идею в немного более короткий код. Во-первых, я чувствую, что для параллельного свертывания полезно использовать концепцию моноида - это структура с ассоциативной операцией и нулевым элементом. Ассоциативность важна, потому что мы не знаем, в каком порядке мы объединяем результаты, которые вычисляются параллельно. И нулевой элемент важен для того, чтобы мы могли разбить вычисления на блоки и начать складывать каждое из них с нуля. В этом нет ничего нового, просто fold
для коллекций Scala ожидает.
// The function defined by Monoid's apply must be associative
// and zero its identity element.
trait Monoid[A]
extends Function2[A,A,A]
{
val zero: A
}
Далее Скала Iterator
у уже есть полезный метод grouped(Int): GroupedIterator[Seq[A]]
который разрезает итератор на последовательности фиксированного размера. Это очень похоже на ваш split
, Это позволяет нам разрезать входные данные на блоки фиксированного размера, а затем применить к ним методы параллельной коллекции Scala:
def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A =
c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid))
.fold(monoid.zero)(monoid);
Мы сворачиваем каждый блок, используя структуру параллельных коллекций, а затем (без какого-либо распараллеливания) объединяем промежуточные результаты.
Пример:
// Example:
object SumMonoid extends Monoid[Long] {
override val zero: Long = 0;
override def apply(x: Long, y: Long) = x + y;
}
val it = Iterator.range(1, 10000001).map(_.toLong)
println(parFold(it, 100000)(SumMonoid));