Иерархическая обработка данных в Apache Spark

У меня есть набор данных в Spark (v2.1.1) с 3 столбцами (как показано ниже), содержащими иерархические данные.

  • Моя цель - назначить добавочную нумерацию для каждой строки на основе иерархии родитель-потомок. Графически можно сказать, что иерархические данные представляют собой совокупность деревьев.
  • Согласно приведенной ниже таблице, у меня уже есть строки, сгруппированные на основе 'Global_ID'. Теперь я хотел бы генерировать столбец "Значение" в инкрементном порядке, но на основе иерархии данных из столбцов "Родитель" и "Дочерний".

Табличное представление (значение - желаемый результат):

    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |      Current Dataset       |         |      Desired Dataset (Output)      |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    | Global_ID | Parent | Child |         | Global_ID | Parent | Child | Value |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |       111 |    111 |   123 |         |       111 |    111 |   111 |     1 |
    |       111 |    135 |   246 |         |       111 |    111 |   123 |     2 |
    |       111 |    123 |   456 |         |       111 |    123 |   789 |     3 |
    |       111 |    123 |   789 |         |       111 |    123 |   456 |     4 |
    |       111 |    111 |   111 |         |       111 |    111 |   135 |     5 |
    |       111 |    135 |   468 |         |       111 |    135 |   246 |     6 |
    |       111 |    135 |   268 |         |       111 |    135 |   468 |     7 |
    |       111 |    268 |   321 |         |       111 |    135 |   268 |     8 |
    |       111 |    138 |   139 |         |       111 |    268 |   321 |     9 |
    |       111 |    111 |   135 |         |       111 |    111 |   138 |    10 |
    |       111 |    111 |   138 |         |       111 |    138 |   139 |    11 |
    |       222 |    222 |   654 |         |       222 |    222 |   222 |    12 |
    |       222 |    654 |   721 |         |       222 |    222 |   987 |    13 |
    |       222 |    222 |   222 |         |       222 |    222 |   654 |    14 |
    |       222 |    721 |   127 |         |       222 |    654 |   721 |    15 |
    |       222 |    222 |   987 |         |       222 |    721 |   127 |    16 |
    |       333 |    333 |   398 |         |       333 |    333 |   333 |    17 |
    |       333 |    333 |   498 |         |       333 |    333 |   398 |    18 |
    |       333 |    333 |   333 |         |       333 |    333 |   498 |    19 |
    |       333 |    333 |   598 |         |       333 |    333 |   598 |    20 |
    +-----------+--------+-------+         +-----------+--------+-------+-------+

Представление дерева (желаемое значение представлено рядом с каждым узлом):

                      +-----+                                           +-----+
                   1  | 111 |                                       17  | 333 |
                      +--+--+                                           +--+--+
                         |                                                 |
         +---------------+--------+-----------------+           +----------+----------+
         |                        |                 |           |          |          |
      +--v--+                  +--v--+           +--v--+     +--v--+    +--v--+    +--v--+
   2  | 123 |                5 | 135 |        10 | 138 |     | 398 |    | 498 |    | 598 |
      +--+--+                  +--+--+           +--+--+     +--+--+    +--+--+    +--+--+  
   +-----+-----+         +--------+--------+        |          18         19         20
   |           |         |        |        |        |  
+--v--+     +--v--+   +--v--+  +--v--+  +--v--+  +--v--+ 
| 789 |     | 456 |   | 246 |  | 468 |  | 268 |  | 139 |                 +-----+
+-----+     +-----+   +-----+  +-----+  +--+--+  +-----+             12  | 222 |
   3           4         6        7      8 |        11                   +--+--+
                                        +--v--+                             |
                                        | 321 |                      +------+-------+
                                        +--+--+                      |              |
                                           9                      +--v--+        +--v--+
                                                               13 | 987 |    14  | 654 |
                                                                  +--+--+        +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             15  | 721 |
                                                                                 +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             16  | 127 |
                                                                                 +--+--+

Фрагмент кода:

Dataset<Row> myDataset = spark
                .sql("select Global_ID, Parent, Child from RECORDS");

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))
    .agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),
        functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))
    .orderBy(new Column("Global_ID"))
    .withColumn("vars", functions.explode(<Spark UDF>)
    .select(new Column("vars"),new Column("parent_col"),new Column("child_col"))
    .javaRDD().zipWithIndex();


// Sample UDF (TODO: Actual Implementation)   
spark.udf().register("computeValue",
                (<Column Names>) -> <functionality & implementation>,
                DataTypes.<xxx>);

После долгих исследований и изучения множества предложений в блогах, я попробовал следующие подходы, но безрезультатно добиться результата для моего сценария.

Технический стек:

  • Apache Spark (v2.1.1)

  • Java 8

  • AWS EMR Cluster (развертывание Spark App)


Объем данных:

  • Примерно ~20 миллионов строк в наборе данных

Подходы пробовали:

  1. Spark GraphX ​​+ GraphFrames:

    • Используя эту комбинацию, я смог добиться только связи между вершинами и ребрами, но она не подходит для моего варианта использования.
      Ссылка: https://graphframes.github.io/user-guide.html
  2. Spark GraphX ​​Pregel API:

    • Это самое близкое к достижению ожидаемого результата, но, к сожалению, я не смог найти фрагмент кода Java для этого. Пример, приведенный в одном из блогов, написан на Scala, с которым я плохо разбираюсь.
      Ссылка: https://dzone.com/articles/processing-hierarchical-data-using-spark-graphx-pr

Любые предложения по альтернативам (или) модификациям в существующих подходах были бы действительно полезны, так как я полностью теряюсь в поиске решения для этого варианта использования.

Ценю твою помощь! Спасибо!

1 ответ

Примечание: приведенное ниже решение относится к scala spark. Вы можете легко перевести на Java-код.

Проверь это. Я пытался сделать это с помощью Spark Sql, вы можете получить представление. По сути, идея заключается в сортировке дочерних, родительских и глобальных идентификаторов при их агрегировании и группировании. После того, как сгруппированы и отсортированы по globalid расширить остальные. Вы получите заказанную таблицу результатов, к которой позже сможете zipWithIndex добавить ранг (значение)

   import org.apache.spark.sql.SQLContext
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.expressions.UserDefinedFunction
   import org.apache.spark.sql.functions.udf

   val sqlContext = new SQLContext(sc)
   import sqlContext.implicits._

   val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598))
   val ddd = sc.parallelize(t).toDF
   val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys)
   val dd1 = ddd
    .groupBy($"_1")
    .agg(sort_array(collect_list($"_2")).as("v"),
         sort_array(collect_list($"_3")).as("w"))
    .orderBy(asc("_1"))
    .withColumn("vars", explode(zip($"v", $"w")))
    .select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex

  dd1.collect

Выход

    res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2),
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9))
Другие вопросы по тегам