Обновление массива через параллельные коллекции Scala

У меня есть этот массив HashMap, определенный как ниже

var distinctElementsDefinitionMap: scala.collection.mutable.ArrayBuffer[HashMap[String, Int]] = new scala.collection.mutable.ArrayBuffer[HashMap[String, Int]](300) with scala.collection.mutable.SynchronizedBuffer[HashMap[String, Int]]

Теперь у меня есть параллельная коллекция из 300 элементов

val max_length = 300
val columnArray = (0 until max_length).toParArray
import scala.collection.parallel.ForkJoinTaskSupport
columnArray.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(100))
columnArray foreach(i => {
    // Do Some Computation and get a HashMap
    var distinctElementsMap: HashMap[String, Int] = //Some Value
    //This line might result in Concurrent Access Exception
    distinctElementsDefinitionMap.update(i, distinctElementsMap)
})

Сейчас я выполняю задачу интенсивного вычисления в пределах foreach петля на columnArray определено выше. После завершения вычислений я бы хотел, чтобы каждый из потоков обновил определенную запись distinctElementsDefinitionMap массив. Каждый поток будет обновлять только определенное значение индекса, уникальное для потока, выполняющего его. Я хочу знать, безопасно ли это обновление записи массива с возможностью одновременной записи в него нескольких потоков? Если нет synchronized способ сделать это так, что это потокобезопасно? Благодарю вас!

Обновление: кажется, это действительно не безопасный способ сделать это. Я получаю java.util.ConcurrentModificationExceptionЛюбые советы о том, как этого избежать, используя параллельные коллекции.

1 ответ

Использование .groupBy Операция, насколько я могу судить, распараллелена (в отличие от некоторых других методов, таких как .sorted)

case class Row(a: String, b: String, c: String)
val data = Vector(
  Row("foo", "", ""), 
  Row("bar", "", ""), 
  Row("foo", "", "")
)

data.par.groupBy(x => x.a).seq
// Map(bar -> ParVector(Row(bar,,)), foo -> ParVector(Row(foo,,), Row(foo,,)))

Надеюсь, у вас есть идея.

Альтернативно, если ваша оперативная память позволяет вам распараллеливать обработку по каждому столбцу, а не по строке, она должна быть более эффективной, чем ваш текущий подход (меньше конфликтов).

val columnsCount = 3 // 300 in your case
Vector.range(0, columnsCount).par.map { column => 
  data.groupBy(row => row(column))
}.seq 

Хотя у вас, вероятно, будут проблемы с памятью даже с одним столбцом (8M строк может быть довольно много).

Другие вопросы по тегам