Как можно использовать параллельный массив?

Я пытаюсь использовать параллельные коллекции Scala для параллельной отправки некоторых вычислений. Поскольку входных данных много, я использую изменяемые массивы для хранения данных, чтобы избежать проблем с сборкой мусора. Это первоначальный подход, который я выбрал:

// initialize the reusable input data structure
val inputData = new Array[Array[Int]](Runtime.getRuntime.availableProcessors*ChunkSize)
for (i <- 0 until inputData.length) {
  inputData(i) = new Array[Int](arraySize)
}

// process the input
while (haveMoreInput()) {
  // read the input--must be sequential!
  for (array <- 0 until inputData.length) {
    for (index <- 0 until arraySize) {
      array(index) = deserializeFromExternalSource()
    }
  }
  // map the data in parallel
  // note that the input data is NOT modified by longRuningProcess
  val results = for (array <- inputData.par) yield {
    longRunningProcess(array)
  }
  // use the results--must be sequential and ordered as input
  for (result <- results.toArray) {
    useResult(result)
  }
}

Учитывая, что ParallelArrayбазовый массив может быть безопасно повторно использован (а именно, изменен и использован в качестве базовой структуры другого ParallelArray), вышеперечисленное должно работать как положено. Однако при запуске происходит сбой с ошибкой памяти:

*** Error in `*** Error in `java': double free or corruption (fasttop): <memory address> ***

Это якобы связано с тем, что параллельная коллекция напрямую использует массив, из которого она была создана; возможно, он пытается освободить этот массив, когда он выходит из области видимости. В любом случае, создание нового массива с каждым циклом, опять же, не вариант из-за ограничений памяти. Явно создавая var parInputData = inputData.par как внутри, так и снаружи while Цикл приводит к той же двойной ошибке без ошибок.

Я не могу просто сделать inputData сама по себе параллельная коллекция, потому что она должна быть заполнена последовательно (попытавшись выполнить назначения для параллельной версии, я поняла, что назначения выполнялись не по порядку). Используя Vector поскольку внешняя структура данных, кажется, работает для относительно небольших входных размеров (< 1000000 входных массивов), но приводит к исключениям служебных данных GC на больших входных данных.

Подход, который я в итоге использовал, заключался в Vector[Vector[Array[Int]]]с внешним вектором, имеющим длину, равную количеству используемых параллельных нитей. Затем я вручную заполнил каждыйVector с порцией массивов входных данных, а затем сделал параллельную карту по внешнему вектору.

Этот последний подход работает, но утомительно вручную разделять входные данные на чанки и добавлять эти чанки в параллельную коллекцию другого уровня. Есть ли способ позволить Scala повторно использовать изменяемый массив для параллельных операций?

РЕДАКТИРОВАТЬ: Сравнительный анализ параллельного векторного решения по сравнению с параллелизированным вручную решением с использованием синхронных очередей показал, что параллельный вектор был примерно на 50% медленнее. Мне интересно, если это просто накладные расходы на лучшую абстракцию или этот разрыв может быть уменьшен с помощью параллельных массивов, а не Vectors; это привело бы к еще одному преимуществу использования массивов по сравнению с Vectors.

1 ответ

На самом деле не имеет смысла разбивать ваши данные на куски, большая часть библиотеки Parallel Collections заключается в том, что она делает это для вас и выполняет намного лучшую работу, чем использование фиксированных размеров кусков. Кроме того, массивы массивов в JVM не похожи на массивы массивов в C, они больше похожи на массивы указателей на множество маленьких массивов, что делает их неэффективными.

Более элегантный способ решить эту проблему - использовать обычный Array и использовать ParRange оперировать этим. longRunningProcess должно быть изменено для работы с одним элементом за раз:

val arraySize = ???

val inputData = Array[Int](arraySize)
val outputData = Array[ResultType](arraySize)

while(haveMoreInput()) {
  for (i <- 0 until arraySize)
    inputData(i) = deserializeFromExternalSource()
  for (i <- (0 until arraySize).par)
    outputData(i) = longRunningProcess(inputData(i))
  outputData.foreach(useResult)
}

При этом используются только два больших массива, и никогда не выделяются новые массивы. ParArray.map, ParArray.toArray, а также Array.par выделены новые массивы в исходном коде.

Мы все еще должны использовать фиксированный arraySize чтобы мы не загружали больше данных в память, для которой у нас есть место. Лучшим решением было бы использование реактивных потоков, но они еще не готовы к производству.

Другие вопросы по тегам