Умножение матриц в Apache Spark

Я пытаюсь выполнить матричное умножение, используя Apache Spark и Java.

У меня есть 2 основных вопроса:

  1. Как создать RDD, который может представлять матрицу в Apache Spark?
  2. Как умножить два таких СДР?

1 ответ

Решение

Все зависит от входных данных и измерений, но, вообще говоря, вы не хотите RDD но одна из распределенных структур данных из org.apache.spark.mllib.linalg.distributed, На данный момент он предоставляет четыре различных реализации DistributedMatrix

  • IndexedRowMatrix - может быть создан непосредственно из RDD[IndexedRow] где IndexedRow состоят из индекса строки и org.apache.spark.mllib.linalg.Vector

    import org.apache.spark.mllib.linalg.{Vectors, Matrices}
    import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix,
      IndexedRow}
    
    val rows =  sc.parallelize(Seq(
      (0L, Array(1.0, 0.0, 0.0)),
      (0L, Array(0.0, 1.0, 0.0)),
      (0L, Array(0.0, 0.0, 1.0)))
    ).map{case (i, xs) => IndexedRow(i, Vectors.dense(xs))}
    
    val indexedRowMatrix = new IndexedRowMatrix(rows)
    
  • RowMatrix - похожий на IndexedRowMatrix но без значимых индексов строк. Может быть создан непосредственно из RDD[org.apache.spark.mllib.linalg.Vector]

    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    
    val rowMatrix = new RowMatrix(rows.map(_.vector))      
    
  • BlockMatrix - может быть создан из RDD[((Int, Int), Matrix)] где первый элемент кортежа содержит координаты блока, а второй - локальный org.apache.spark.mllib.linalg.Matrix

    val eye = Matrices.sparse(
      3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1))
    
    val blocks = sc.parallelize(Seq(
       ((0, 0), eye), ((1, 1), eye), ((2, 2), eye)))
    
    val blockMatrix = new BlockMatrix(blocks, 3, 3, 9, 9)
    
  • CoordinateMatrix - может быть создан из RDD[MatrixEntry] где MatrixEntry состоит из строки, столбца и значения.

    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,
      MatrixEntry}
    
    val entries = sc.parallelize(Seq(
       (0, 0, 3.0), (2, 0, -5.0), (3, 2, 1.0),
       (4, 1, 6.0), (6, 2, 2.0), (8, 1, 4.0))
    ).map{case (i, j, v) => MatrixEntry(i, j, v)}
    
    val coordinateMatrix = new CoordinateMatrix(entries, 9, 3)
    

Первые две реализации поддерживают умножение на локальный Matrix:

val localMatrix = Matrices.dense(3, 2, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))

indexedRowMatrix.multiply(localMatrix).rows.collect
// Array(IndexedRow(0,[1.0,4.0]), IndexedRow(0,[2.0,5.0]),
//   IndexedRow(0,[3.0,6.0]))

а третий можно умножить на другой BlockMatrix до тех пор, пока количество столбцов в блоке в этой матрице совпадает с количеством строк в блоке другой матрицы. CoordinateMatrix не поддерживает умножения, но довольно легко создавать и преобразовывать в другие типы распределенных матриц:

blockMatrix.multiply(coordinateMatrix.toBlockMatrix(3, 3))

У каждого типа есть свои сильные и слабые стороны, и при использовании разреженных или плотных элементов необходимо учитывать некоторые дополнительные факторы (Vectors или заблокировать Matrices). Умножение на локальную матрицу обычно предпочтительнее, так как не требует дорогостоящего тасования.

Вы можете найти более подробную информацию о каждом типе в руководстве по типам данных MLlib.

Другие вопросы по тегам