Умножение матриц в Apache Spark
Я пытаюсь выполнить матричное умножение, используя Apache Spark и Java.
У меня есть 2 основных вопроса:
- Как создать RDD, который может представлять матрицу в Apache Spark?
- Как умножить два таких СДР?
1 ответ
Все зависит от входных данных и измерений, но, вообще говоря, вы не хотите RDD
но одна из распределенных структур данных из org.apache.spark.mllib.linalg.distributed
, На данный момент он предоставляет четыре различных реализации DistributedMatrix
IndexedRowMatrix
- может быть создан непосредственно изRDD[IndexedRow]
гдеIndexedRow
состоят из индекса строки иorg.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{Vectors, Matrices} import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix, IndexedRow} val rows = sc.parallelize(Seq( (0L, Array(1.0, 0.0, 0.0)), (0L, Array(0.0, 1.0, 0.0)), (0L, Array(0.0, 0.0, 1.0))) ).map{case (i, xs) => IndexedRow(i, Vectors.dense(xs))} val indexedRowMatrix = new IndexedRowMatrix(rows)
RowMatrix
- похожий наIndexedRowMatrix
но без значимых индексов строк. Может быть создан непосредственно изRDD[org.apache.spark.mllib.linalg.Vector]
import org.apache.spark.mllib.linalg.distributed.RowMatrix val rowMatrix = new RowMatrix(rows.map(_.vector))
BlockMatrix
- может быть создан изRDD[((Int, Int), Matrix)]
где первый элемент кортежа содержит координаты блока, а второй - локальныйorg.apache.spark.mllib.linalg.Matrix
val eye = Matrices.sparse( 3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1)) val blocks = sc.parallelize(Seq( ((0, 0), eye), ((1, 1), eye), ((2, 2), eye))) val blockMatrix = new BlockMatrix(blocks, 3, 3, 9, 9)
CoordinateMatrix
- может быть создан изRDD[MatrixEntry]
гдеMatrixEntry
состоит из строки, столбца и значения.import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries = sc.parallelize(Seq( (0, 0, 3.0), (2, 0, -5.0), (3, 2, 1.0), (4, 1, 6.0), (6, 2, 2.0), (8, 1, 4.0)) ).map{case (i, j, v) => MatrixEntry(i, j, v)} val coordinateMatrix = new CoordinateMatrix(entries, 9, 3)
Первые две реализации поддерживают умножение на локальный Matrix
:
val localMatrix = Matrices.dense(3, 2, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
indexedRowMatrix.multiply(localMatrix).rows.collect
// Array(IndexedRow(0,[1.0,4.0]), IndexedRow(0,[2.0,5.0]),
// IndexedRow(0,[3.0,6.0]))
а третий можно умножить на другой BlockMatrix
до тех пор, пока количество столбцов в блоке в этой матрице совпадает с количеством строк в блоке другой матрицы. CoordinateMatrix
не поддерживает умножения, но довольно легко создавать и преобразовывать в другие типы распределенных матриц:
blockMatrix.multiply(coordinateMatrix.toBlockMatrix(3, 3))
У каждого типа есть свои сильные и слабые стороны, и при использовании разреженных или плотных элементов необходимо учитывать некоторые дополнительные факторы (Vectors
или заблокировать Matrices
). Умножение на локальную матрицу обычно предпочтительнее, так как не требует дорогостоящего тасования.
Вы можете найти более подробную информацию о каждом типе в руководстве по типам данных MLlib.