Параллельная обработка сбора данных, превышающих объем памяти

Есть ли простой способ использовать параллельные коллекции scala без загрузки полной коллекции в память?

Например, у меня есть большая коллекция, и я хотел бы выполнить определенную операцию (сгиб) параллельно только на небольшом фрагменте, который умещается в памяти, а не на другом фрагменте и т. Д., И, наконец, рекомбинировать результаты из всех фрагментов.

Я знаю, что актеров можно было бы использовать, но было бы неплохо использовать пар-коллекции.

Я написал решение, но это не приятно:

  def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = {
    new Iterator[Iterable[A]] {
      var rest = list
      def hasNext = !rest.isEmpty
      def next = {
        val chunk = rest.take(chunkSize)
        rest = rest.drop(chunkSize)
        chunk
      }
    }.toIterable
  }                                               

  def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = {
    val chunks: Iterable[Iterable[A]] = split(list, chunkSize)
    def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) }
    chunks.foldLeft(acc)(combineChunk)
  }                                               

  val chunkSize = 10000000                        
    val x = 1 to chunkSize*10                 

    def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n }

    foldPar(0)(x,chunkSize,sum)

1 ответ

Решение

Ваша идея очень аккуратная, и жаль, что такой функции уже нет (AFAIK).

Я просто перефразировал вашу идею в немного более короткий код. Во-первых, я чувствую, что для параллельного свертывания полезно использовать концепцию моноида - это структура с ассоциативной операцией и нулевым элементом. Ассоциативность важна, потому что мы не знаем, в каком порядке мы объединяем результаты, которые вычисляются параллельно. И нулевой элемент важен для того, чтобы мы могли разбить вычисления на блоки и начать складывать каждое из них с нуля. В этом нет ничего нового, просто fold для коллекций Scala ожидает.

// The function defined by Monoid's apply must be associative
// and zero its identity element.
trait Monoid[A]
  extends Function2[A,A,A]
{
  val zero: A
}

Далее Скала Iteratorу уже есть полезный метод grouped(Int): GroupedIterator[Seq[A]] который разрезает итератор на последовательности фиксированного размера. Это очень похоже на ваш split, Это позволяет нам разрезать входные данные на блоки фиксированного размера, а затем применить к ним методы параллельной коллекции Scala:

def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A =
  c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid))
                      .fold(monoid.zero)(monoid);

Мы сворачиваем каждый блок, используя структуру параллельных коллекций, а затем (без какого-либо распараллеливания) объединяем промежуточные результаты.

Пример:

// Example:
object SumMonoid extends Monoid[Long] {
  override val zero: Long = 0;
  override def apply(x: Long, y: Long) = x + y;
}
val it = Iterator.range(1, 10000001).map(_.toLong)
println(parFold(it, 100000)(SumMonoid));
Другие вопросы по тегам