Выполнить Spark CrossJoin за раздел

Я экспериментирую с некоторыми пользовательскими дистанционными функциями для обучения без контроля, где мне нужно сравнить каждый вектор в наборе данных со всеми другими векторами. Так как я не могу вызвать фрейм данных из фрейма данных (или rdd из фрейма данных или любой их комбинации) - я считаю, что CrossJoin - единственный путь.

Очевидно, что CrossJoin запускает создание nxn набор данных, где есть n векторы в наборе данных. Тем не менее, я могу жить с CrossJoin, выполняя один вектор за раз, так как каждый вектор оценивается независимо. Таким образом, есть ли способ выполнить crossJoin (не ленивое выполнение, а фактическое вычисление) и последующие вычисления для каждого раздела, так что кластеру фактически не нужно оценивать и хранить все nxn матрица?

Изменить: небольшой пример, чтобы продемонстрировать проблему:

// X and Y are transformed using VectorAssembler to generate the vector V
vdf.show(20,200)
+-------+-------+-----------------+
|      X|      Y|                V|
+-------+-------+-----------------+
|  623.0| 9869.0|   [623.0,9869.0]|
| 5287.0| 9217.0|  [5287.0,9217.0]|
| 9369.0| 4000.0|  [9369.0,4000.0]|
| 3053.0| 7106.0|  [3053.0,7106.0]|
| 7281.0| 7859.0|  [7281.0,7859.0]|
|20000.0|30000.0|[20000.0,30000.0]|
+-------+-------+-----------------+

//Next, we do a crossJoin to create a matrix of nxn
val cv = vdf.select($"V".alias("V1")).crossJoin(vdf.select($"V".alias("V2")))
cv.sort("V1").show(40,200)

cv: org.apache.spark.sql.DataFrame = [V1: vector, V2: vector]
+-----------------+-----------------+
|               V1|               V2|
+-----------------+-----------------+
|   [623.0,9869.0]|   [623.0,9869.0]|
|   [623.0,9869.0]|  [9369.0,4000.0]|
|   [623.0,9869.0]|  [3053.0,7106.0]|
|   [623.0,9869.0]|[20000.0,30000.0]|
|   [623.0,9869.0]|  [5287.0,9217.0]|
|   [623.0,9869.0]|  [7281.0,7859.0]|
|  [3053.0,7106.0]|  [3053.0,7106.0]|
|  [3053.0,7106.0]|   [623.0,9869.0]|
|  [3053.0,7106.0]|  [9369.0,4000.0]|
|  [3053.0,7106.0]|  [5287.0,9217.0]|
|  [3053.0,7106.0]|[20000.0,30000.0]|
|  [3053.0,7106.0]|  [7281.0,7859.0]|
..........
..........

Тем не менее, вышеупомянутого можно избежать, если мы можем сделать crossJoin для каждого уникального V1 и последующих вычислений на перекрестном наборе данных. Позже, массив или RDD результирующих кадров данных могут быть объединены, чтобы сравнить оценки каждого вектора.

Я пытался контролировать выполнение, назначая uniqueId каждому вектору перед crossJoin, извлекая uniqueIds в виде массива, а затем перебирая массив, выполняя crossJoin и другие операции над кадрами в цикле for / foreach / map, но похоже, что все они выполняются внутри драйвера, если я выполняю итерацию по локальному массиву (что сбивает с толку, поскольку я представлял, что операции с циклом данных внутри цикла будут распределяться, но, по-видимому, это не так).

Я использую EMR с Spark 2.3.0 (Scala 2.11.8).

0 ответов

Другие вопросы по тегам