regroupBy, по-видимому, принудительно пересчитывает Spark DataFrame
Я испытываю странное поведение при использовании Spark 2.3.0 со Scala, и мне нужен совет.
У меня есть DataFrame, давайте назовем его elemsDF
, который выглядит так:
+--------------------+--------------------+--------+----------+-----------+------------------+------+
| ref_el_id| ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|897cc061-3d02-422...|b6ddddeb-ea26-485...| 0.1| s1| 1.0| 0.0|201611|
|aecb9ee1-5a7b-4d5...|b6ddddeb-ea26-485...| 0.0| s1| 0.0| 0.0|201611|
|36e51ce6-f19d-4ff...|f66d637a-5af1-4eb...| 0.1| s1| 1.0| 0.0|201611|
|2fec8f90-f033-431...|f66d637a-5af1-4eb...| 0.0| s1| 0.0| 0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
Колонны ref_el_id
а также ref_id
являются случайно сгенерированными UUID, созданными в некоторый момент UserDefinedFunction
,
import java.util.UUID
def genUuidUdf: UserDefinedFunction = udf(() => UUID.randomUUID().toString)
Теперь это нормально, что каждый раз, когда я показываю elemsDF
, UUID пересчитываются.
elemsDF.show()
+--------------------+--------------------+--------+----------+-----------+------------------+------+
| ref_el_id| ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|897cc061-3d02-422...|b6ddddeb-ea26-485...| 0.1| s1| 1.0| 0.0|201611|
|aecb9ee1-5a7b-4d5...|b6ddddeb-ea26-485...| 0.0| s1| 0.0| 0.0|201611|
|36e51ce6-f19d-4ff...|f66d637a-5af1-4eb...| 0.1| s1| 1.0| 0.0|201611|
|2fec8f90-f033-431...|f66d637a-5af1-4eb...| 0.0| s1| 0.0| 0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
elemsDF.show()
+--------------------+--------------------+--------+----------+-----------+------------------+------+
| ref_el_id| ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|e1bfd5a9-91fc-422...|364c9a75-3990-427...| 0.1| s1| 1.0| 0.0|201611|
|6bfaa133-ee1e-4e9...|364c9a75-3990-427...| 0.0| s1| 0.0| 0.0|201611|
|6f4fc033-3aa1-4e9...|d3e4f33c-2e3c-423...| 0.1| s1| 1.0| 0.0|201611|
|d70e66a4-da5d-49c...|d3e4f33c-2e3c-423...| 0.0| s1| 0.0| 0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
Пока все хорошо, мы сохраняем DataFrame, и это фиксирует значения, рассчитанные для UUID.
elemsDF.persist()
поскольку persist()
это ленивая операция, для ее активации требуется действие. Бег show()
снова это делается.
elemsDF.show()
+--------------------+--------------------+--------+----------+-----------+------------------+------+
| ref_el_id| ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|4a23d926-9bfa-484...|0d67060b-8a76-492...| 0.1| s1| 1.0| 0.0|201611|
|b1536569-917b-44b...|0d67060b-8a76-492...| 0.0| s1| 0.0| 0.0|201611|
|325d8e49-c6e6-4b0...|bc575378-0216-46c...| 0.1| s1| 1.0| 0.0|201611|
|bbcc9c19-95e0-45c...|bc575378-0216-46c...| 0.0| s1| 0.0| 0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
elemsDF.show()
+--------------------+--------------------+--------+----------+-----------+------------------+------+
| ref_el_id| ref_id|time_rel|signalname|signalvalue|standard_deviation|p_date|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
|4a23d926-9bfa-484...|0d67060b-8a76-492...| 0.1| s1| 1.0| 0.0|201611|
|b1536569-917b-44b...|0d67060b-8a76-492...| 0.0| s1| 0.0| 0.0|201611|
|325d8e49-c6e6-4b0...|bc575378-0216-46c...| 0.1| s1| 1.0| 0.0|201611|
|bbcc9c19-95e0-45c...|bc575378-0216-46c...| 0.0| s1| 0.0| 0.0|201611|
+--------------------+--------------------+--------+----------+-----------+------------------+------+
Так что это имеет смысл. Значения теперь зафиксированы и не пересчитываются снова и снова.
Теперь странное поведение; Я пытаюсь использовать groupBy
и функция агрегации, однако, даже после сохранения DataFrame, выполнение которого, по-видимому, вызывает внутренний пересчет.
elemsDF.groupBy("ref_id").agg(count("signalname")).show
+--------------------+---------+
| ref_id|n_signals|
+--------------------+---------+
|d8ec0aa2-8097-40a...| 2|
|e4071400-8298-410...| 2|
+--------------------+---------+
elemsDF.groupBy("ref_id").agg(count("signalname")).show
+--------------------+-----------------+
| ref_id|count(signalname)|
+--------------------+-----------------+
|cc206b29-c9ad-49d...| 2|
|454019c4-80ec-449...| 2|
+--------------------+-----------------+
Может ли кто-нибудь объяснить мне, что здесь происходит и что я должен делать, чтобы избежать такого поведения?
Редактирование: Спасибо @user10938362, но связанный вопрос заключается в том, чтобы не получить случайные значения, ожидаемые в новом столбце, созданном UDF, и genUuidUdf.asNonDeterministic
не имеет значения. show
без persist
по-прежнему производит разные UUID каждый раз и даже после сохранения groupBy
все еще заставляет пересчитывать и генерировать новые случайные значения.
Изменить: мой вопрос был помечен как дубликат, и хотя он не является дубликатом предложенного вопроса, это дубликат другого вопроса.
TL;DR это: не пишите недетерминированные UDF.