Выполнить Spark CrossJoin за раздел
Я экспериментирую с некоторыми пользовательскими дистанционными функциями для обучения без контроля, где мне нужно сравнить каждый вектор в наборе данных со всеми другими векторами. Так как я не могу вызвать фрейм данных из фрейма данных (или rdd из фрейма данных или любой их комбинации) - я считаю, что CrossJoin - единственный путь.
Очевидно, что CrossJoin запускает создание nxn
набор данных, где есть n
векторы в наборе данных. Тем не менее, я могу жить с CrossJoin, выполняя один вектор за раз, так как каждый вектор оценивается независимо. Таким образом, есть ли способ выполнить crossJoin (не ленивое выполнение, а фактическое вычисление) и последующие вычисления для каждого раздела, так что кластеру фактически не нужно оценивать и хранить все nxn
матрица?
Изменить: небольшой пример, чтобы продемонстрировать проблему:
// X and Y are transformed using VectorAssembler to generate the vector V
vdf.show(20,200)
+-------+-------+-----------------+
| X| Y| V|
+-------+-------+-----------------+
| 623.0| 9869.0| [623.0,9869.0]|
| 5287.0| 9217.0| [5287.0,9217.0]|
| 9369.0| 4000.0| [9369.0,4000.0]|
| 3053.0| 7106.0| [3053.0,7106.0]|
| 7281.0| 7859.0| [7281.0,7859.0]|
|20000.0|30000.0|[20000.0,30000.0]|
+-------+-------+-----------------+
//Next, we do a crossJoin to create a matrix of nxn
val cv = vdf.select($"V".alias("V1")).crossJoin(vdf.select($"V".alias("V2")))
cv.sort("V1").show(40,200)
cv: org.apache.spark.sql.DataFrame = [V1: vector, V2: vector]
+-----------------+-----------------+
| V1| V2|
+-----------------+-----------------+
| [623.0,9869.0]| [623.0,9869.0]|
| [623.0,9869.0]| [9369.0,4000.0]|
| [623.0,9869.0]| [3053.0,7106.0]|
| [623.0,9869.0]|[20000.0,30000.0]|
| [623.0,9869.0]| [5287.0,9217.0]|
| [623.0,9869.0]| [7281.0,7859.0]|
| [3053.0,7106.0]| [3053.0,7106.0]|
| [3053.0,7106.0]| [623.0,9869.0]|
| [3053.0,7106.0]| [9369.0,4000.0]|
| [3053.0,7106.0]| [5287.0,9217.0]|
| [3053.0,7106.0]|[20000.0,30000.0]|
| [3053.0,7106.0]| [7281.0,7859.0]|
..........
..........
Тем не менее, вышеупомянутого можно избежать, если мы можем сделать crossJoin для каждого уникального V1 и последующих вычислений на перекрестном наборе данных. Позже, массив или RDD результирующих кадров данных могут быть объединены, чтобы сравнить оценки каждого вектора.
Я пытался контролировать выполнение, назначая uniqueId каждому вектору перед crossJoin, извлекая uniqueIds в виде массива, а затем перебирая массив, выполняя crossJoin и другие операции над кадрами в цикле for / foreach / map, но похоже, что все они выполняются внутри драйвера, если я выполняю итерацию по локальному массиву (что сбивает с толку, поскольку я представлял, что операции с циклом данных внутри цикла будут распределяться, но, по-видимому, это не так).
Я использую EMR с Spark 2.3.0 (Scala 2.11.8).