Spark Dataframe Случайный UUID изменяется после каждого преобразования / действия

У меня есть Spark Dataframe со столбцом, который содержит сгенерированный UUID. Однако каждый раз, когда я выполняю действие или преобразование в кадре данных, он изменяет UUID на каждом этапе.

Как сгенерировать UUID только один раз, и после этого UUID останется статическим.

Ниже приведен пример кода для повторного создания моей проблемы:

def process(spark: SparkSession): Unit = {

  import spark.implicits._

  val sc = spark.sparkContext
  val sqlContext = spark.sqlContext
  sc.setLogLevel("OFF")

  // create dataframe
  val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
  df.createOrReplaceTempView("df")
  df.show(false)

  // register an UDF that creates a random UUID
  val generateUUID = udf(() => UUID.randomUUID().toString)

  // generate UUID for new column
  val dfWithUuid = df.withColumn("new_uuid", generateUUID())
  dfWithUuid.show(false)
  dfWithUuid.show(false)    // uuid is different

  // new transformations also change the uuid
  val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
  dfWithUuidWithNewCol.show(false)
}

Выход:

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b   |2   |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c   |3   |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b   |2   |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c   |3   |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b   |2   |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c   |3   |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+

Обратите внимание, что UUID отличается на каждом этапе.

4 ответа

Решение

Это ожидаемое поведение. Пользовательские функции должны быть детерминированными:

Пользовательские функции должны быть детерминированными. Из-за оптимизации повторяющиеся вызовы могут быть исключены, или функция может быть вызвана даже больше раз, чем это присутствует в запросе.

Если вы хотите включить недетерминированную функцию и сохранить выходные данные, вы должны записать промежуточные данные в постоянное хранилище и прочитать их обратно. Контрольные точки или кэширование могут работать в некоторых простых случаях, но в целом они не будут надежными.

Если восходящий процесс детерминирован (для начала есть случайное перемешивание), вы можете попробовать использовать rand функция с семенем, преобразовать в байтовый массив и передать UUID.nameUUIDFromBytes,

См. Также: О том, как добавить новый столбец в существующий DataFrame со случайными значениями в Scala.

Примечание: SPARK-20586 представлен deterministic флаг, который может отключить определенную оптимизацию, но не ясно, как она ведет себя, когда данные persisted и происходит потеря исполнителя.

Это очень старый вопрос, но дать людям понять, что сработало для меня. Это может кому-то помочь.

Вы можете использовать функцию expr, как показано ниже, для создания уникальных идентификаторов GUID, которые не меняются при преобразованиях.

import org.apache.spark.sql.functions._  
// create dataframe  
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")   
df.createOrReplaceTempView("df")   
df.show(false)

// generate UUID for new column   
val dfWithUuid = df.withColumn("new_uuid", expr("uuid()"))
dfWithUuid.show(false)
dfWithUuid.show(false)    

// new transformations 
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)

Результат выглядит следующим образом:

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|2.0 |
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|3.0 |
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|4.0 |
+----+----+------------------------------------+----+

У меня есть версия pyspark:

      from pyspark.sql import functions as f

pdataDF=dataDF.withColumn("uuid_column",f.expr("uuid()"))
display(pdataDF)
pdataDF.write.mode("overwrite").saveAsTable("tempUuidCheck")

Попробуй это:

df.withColumn("XXXID", lit(java.util.UUID.randomUUID().toString))

он работает иначе:

val generateUUID = udf(() => java.util.UUID.randomUUID().toString)
df.withColumn("XXXCID", generateUUID() )

Надеюсь, это поможет.

Павел

Другие вопросы по тегам