Как можно использовать параллельный массив?
Я пытаюсь использовать параллельные коллекции Scala для параллельной отправки некоторых вычислений. Поскольку входных данных много, я использую изменяемые массивы для хранения данных, чтобы избежать проблем с сборкой мусора. Это первоначальный подход, который я выбрал:
// initialize the reusable input data structure
val inputData = new Array[Array[Int]](Runtime.getRuntime.availableProcessors*ChunkSize)
for (i <- 0 until inputData.length) {
inputData(i) = new Array[Int](arraySize)
}
// process the input
while (haveMoreInput()) {
// read the input--must be sequential!
for (array <- 0 until inputData.length) {
for (index <- 0 until arraySize) {
array(index) = deserializeFromExternalSource()
}
}
// map the data in parallel
// note that the input data is NOT modified by longRuningProcess
val results = for (array <- inputData.par) yield {
longRunningProcess(array)
}
// use the results--must be sequential and ordered as input
for (result <- results.toArray) {
useResult(result)
}
}
Учитывая, что ParallelArray
базовый массив может быть безопасно повторно использован (а именно, изменен и использован в качестве базовой структуры другого ParallelArray
), вышеперечисленное должно работать как положено. Однако при запуске происходит сбой с ошибкой памяти:
*** Error in `*** Error in `java': double free or corruption (fasttop): <memory address> ***
Это якобы связано с тем, что параллельная коллекция напрямую использует массив, из которого она была создана; возможно, он пытается освободить этот массив, когда он выходит из области видимости. В любом случае, создание нового массива с каждым циклом, опять же, не вариант из-за ограничений памяти. Явно создавая var parInputData = inputData.par
как внутри, так и снаружи while
Цикл приводит к той же двойной ошибке без ошибок.
Я не могу просто сделать inputData
сама по себе параллельная коллекция, потому что она должна быть заполнена последовательно (попытавшись выполнить назначения для параллельной версии, я поняла, что назначения выполнялись не по порядку). Используя Vector
поскольку внешняя структура данных, кажется, работает для относительно небольших входных размеров (< 1000000 входных массивов), но приводит к исключениям служебных данных GC на больших входных данных.
Подход, который я в итоге использовал, заключался в Vector[Vector[Array[Int]]]
с внешним вектором, имеющим длину, равную количеству используемых параллельных нитей. Затем я вручную заполнил каждыйVector
с порцией массивов входных данных, а затем сделал параллельную карту по внешнему вектору.
Этот последний подход работает, но утомительно вручную разделять входные данные на чанки и добавлять эти чанки в параллельную коллекцию другого уровня. Есть ли способ позволить Scala повторно использовать изменяемый массив для параллельных операций?
РЕДАКТИРОВАТЬ: Сравнительный анализ параллельного векторного решения по сравнению с параллелизированным вручную решением с использованием синхронных очередей показал, что параллельный вектор был примерно на 50% медленнее. Мне интересно, если это просто накладные расходы на лучшую абстракцию или этот разрыв может быть уменьшен с помощью параллельных массивов, а не Vector
s; это привело бы к еще одному преимуществу использования массивов по сравнению с Vector
s.
1 ответ
На самом деле не имеет смысла разбивать ваши данные на куски, большая часть библиотеки Parallel Collections заключается в том, что она делает это для вас и выполняет намного лучшую работу, чем использование фиксированных размеров кусков. Кроме того, массивы массивов в JVM не похожи на массивы массивов в C, они больше похожи на массивы указателей на множество маленьких массивов, что делает их неэффективными.
Более элегантный способ решить эту проблему - использовать обычный Array
и использовать ParRange
оперировать этим. longRunningProcess
должно быть изменено для работы с одним элементом за раз:
val arraySize = ???
val inputData = Array[Int](arraySize)
val outputData = Array[ResultType](arraySize)
while(haveMoreInput()) {
for (i <- 0 until arraySize)
inputData(i) = deserializeFromExternalSource()
for (i <- (0 until arraySize).par)
outputData(i) = longRunningProcess(inputData(i))
outputData.foreach(useResult)
}
При этом используются только два больших массива, и никогда не выделяются новые массивы. ParArray.map
, ParArray.toArray
, а также Array.par
выделены новые массивы в исходном коде.
Мы все еще должны использовать фиксированный arraySize
чтобы мы не загружали больше данных в память, для которой у нас есть место. Лучшим решением было бы использование реактивных потоков, но они еще не готовы к производству.