Обновление массива через параллельные коллекции Scala
У меня есть этот массив HashMap, определенный как ниже
var distinctElementsDefinitionMap: scala.collection.mutable.ArrayBuffer[HashMap[String, Int]] = new scala.collection.mutable.ArrayBuffer[HashMap[String, Int]](300) with scala.collection.mutable.SynchronizedBuffer[HashMap[String, Int]]
Теперь у меня есть параллельная коллекция из 300 элементов
val max_length = 300
val columnArray = (0 until max_length).toParArray
import scala.collection.parallel.ForkJoinTaskSupport
columnArray.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(100))
columnArray foreach(i => {
// Do Some Computation and get a HashMap
var distinctElementsMap: HashMap[String, Int] = //Some Value
//This line might result in Concurrent Access Exception
distinctElementsDefinitionMap.update(i, distinctElementsMap)
})
Сейчас я выполняю задачу интенсивного вычисления в пределах foreach
петля на columnArray
определено выше. После завершения вычислений я бы хотел, чтобы каждый из потоков обновил определенную запись distinctElementsDefinitionMap
массив. Каждый поток будет обновлять только определенное значение индекса, уникальное для потока, выполняющего его. Я хочу знать, безопасно ли это обновление записи массива с возможностью одновременной записи в него нескольких потоков? Если нет synchronized
способ сделать это так, что это потокобезопасно? Благодарю вас!
Обновление: кажется, это действительно не безопасный способ сделать это. Я получаю java.util.ConcurrentModificationException
Любые советы о том, как этого избежать, используя параллельные коллекции.
1 ответ
Использование .groupBy
Операция, насколько я могу судить, распараллелена (в отличие от некоторых других методов, таких как .sorted
)
case class Row(a: String, b: String, c: String)
val data = Vector(
Row("foo", "", ""),
Row("bar", "", ""),
Row("foo", "", "")
)
data.par.groupBy(x => x.a).seq
// Map(bar -> ParVector(Row(bar,,)), foo -> ParVector(Row(foo,,), Row(foo,,)))
Надеюсь, у вас есть идея.
Альтернативно, если ваша оперативная память позволяет вам распараллеливать обработку по каждому столбцу, а не по строке, она должна быть более эффективной, чем ваш текущий подход (меньше конфликтов).
val columnsCount = 3 // 300 in your case
Vector.range(0, columnsCount).par.map { column =>
data.groupBy(row => row(column))
}.seq
Хотя у вас, вероятно, будут проблемы с памятью даже с одним столбцом (8M строк может быть довольно много).