regroupBy, по-видимому, принудительно пересчитывает Spark DataFrame

Я испытываю странное поведение при использовании Spark 2.3.0 со Scala, и мне нужен совет.

У меня есть DataFrame, давайте назовем его elemsDF, который выглядит так:

+--------------------+--------------------+--------+----------+-----------+------------------+------+
|           ref_el_id|              ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|897cc061-3d02-422...|b6ddddeb-ea26-485...|     0.1|        s1|        1.0|               0.0|201611|
|aecb9ee1-5a7b-4d5...|b6ddddeb-ea26-485...|     0.0|        s1|        0.0|               0.0|201611|
|36e51ce6-f19d-4ff...|f66d637a-5af1-4eb...|     0.1|        s1|        1.0|               0.0|201611|
|2fec8f90-f033-431...|f66d637a-5af1-4eb...|     0.0|        s1|        0.0|               0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+

Колонны ref_el_id а также ref_id являются случайно сгенерированными UUID, созданными в некоторый момент UserDefinedFunction,

import java.util.UUID

def genUuidUdf: UserDefinedFunction = udf(() => UUID.randomUUID().toString)

Теперь это нормально, что каждый раз, когда я показываю elemsDF, UUID пересчитываются.

elemsDF.show()
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|           ref_el_id|              ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|897cc061-3d02-422...|b6ddddeb-ea26-485...|     0.1|        s1|        1.0|               0.0|201611|
|aecb9ee1-5a7b-4d5...|b6ddddeb-ea26-485...|     0.0|        s1|        0.0|               0.0|201611|
|36e51ce6-f19d-4ff...|f66d637a-5af1-4eb...|     0.1|        s1|        1.0|               0.0|201611|
|2fec8f90-f033-431...|f66d637a-5af1-4eb...|     0.0|        s1|        0.0|               0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+

elemsDF.show()
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|           ref_el_id|              ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|e1bfd5a9-91fc-422...|364c9a75-3990-427...|     0.1|        s1|        1.0|               0.0|201611|
|6bfaa133-ee1e-4e9...|364c9a75-3990-427...|     0.0|        s1|        0.0|               0.0|201611|
|6f4fc033-3aa1-4e9...|d3e4f33c-2e3c-423...|     0.1|        s1|        1.0|               0.0|201611|
|d70e66a4-da5d-49c...|d3e4f33c-2e3c-423...|     0.0|        s1|        0.0|               0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+

Пока все хорошо, мы сохраняем DataFrame, и это фиксирует значения, рассчитанные для UUID.

elemsDF.persist()

поскольку persist() это ленивая операция, для ее активации требуется действие. Бег show() снова это делается.

elemsDF.show()
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|           ref_el_id|              ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|4a23d926-9bfa-484...|0d67060b-8a76-492...|     0.1|        s1|        1.0|               0.0|201611|
|b1536569-917b-44b...|0d67060b-8a76-492...|     0.0|        s1|        0.0|               0.0|201611|
|325d8e49-c6e6-4b0...|bc575378-0216-46c...|     0.1|        s1|        1.0|               0.0|201611|
|bbcc9c19-95e0-45c...|bc575378-0216-46c...|     0.0|        s1|        0.0|               0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+

elemsDF.show()
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|           ref_el_id|              ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|4a23d926-9bfa-484...|0d67060b-8a76-492...|     0.1|        s1|        1.0|               0.0|201611|
|b1536569-917b-44b...|0d67060b-8a76-492...|     0.0|        s1|        0.0|               0.0|201611|
|325d8e49-c6e6-4b0...|bc575378-0216-46c...|     0.1|        s1|        1.0|               0.0|201611|
|bbcc9c19-95e0-45c...|bc575378-0216-46c...|     0.0|        s1|        0.0|               0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+

Так что это имеет смысл. Значения теперь зафиксированы и не пересчитываются снова и снова.

Теперь странное поведение; Я пытаюсь использовать groupBy и функция агрегации, однако, даже после сохранения DataFrame, выполнение которого, по-видимому, вызывает внутренний пересчет.

elemsDF.groupBy("ref_id").agg(count("signalname")).show
+--------------------+---------+                                                
|              ref_id|n_signals|
+--------------------+---------+
|d8ec0aa2-8097-40a...|        2|
|e4071400-8298-410...|        2|
+--------------------+---------+

elemsDF.groupBy("ref_id").agg(count("signalname")).show
+--------------------+-----------------+                                        
|              ref_id|count(signalname)|
+--------------------+-----------------+
|cc206b29-c9ad-49d...|                2|
|454019c4-80ec-449...|                2|
+--------------------+-----------------+

Может ли кто-нибудь объяснить мне, что здесь происходит и что я должен делать, чтобы избежать такого поведения?

Редактирование: Спасибо @user10938362, но связанный вопрос заключается в том, чтобы не получить случайные значения, ожидаемые в новом столбце, созданном UDF, и genUuidUdf.asNonDeterministic не имеет значения. show без persist по-прежнему производит разные UUID каждый раз и даже после сохранения groupBy все еще заставляет пересчитывать и генерировать новые случайные значения.

Изменить: мой вопрос был помечен как дубликат, и хотя он не является дубликатом предложенного вопроса, это дубликат другого вопроса.

TL;DR это: не пишите недетерминированные UDF.

0 ответов

Другие вопросы по тегам