О том, как добавить новый столбец в существующий DataFrame со случайными значениями в Scala
У меня есть фрейм данных с файлом паркета, и я должен добавить новый столбец с некоторыми случайными данными, но мне нужны эти случайные данные, отличающиеся друг от друга. Это мой настоящий код, и текущая версия spark 1.5.1-cdh-5.5.2:
val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686
mydf.cache
val r = scala.util.Random
import org.apache.spark.sql.functions.udf
def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
с этим кодом у меня есть эти данные:
scala> myNewDF.select("myNewColumn").show(10,false)
+-----------+
|myNewColumn|
+-----------+
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
+-----------+
Похоже, что udf myNextPositiveNumber вызывается только один раз, не так ли?
Обновление подтверждено, есть только одно отдельное значение:
scala> myNewDF.select("myNewColumn").distinct.show(50,false)
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
...
+-----------+
|myNewColumn|
+-----------+
|889488717D |
+-----------+
что я делаю не так?
Обновление 2: наконец, с помощью @user6910411 у меня есть этот код:
val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686
mydf.cache
val r = scala.util.Random
import org.apache.spark.sql.functions.udf
val accum = sc.accumulator(1)
def myNextPositiveNumber():String = {
accum+=1
accum.value.toString.concat("D")
}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
myNewDF.select("myNewColumn").count
// 63385686
обновление 3
Фактический код генерирует такие данные:
scala> mydf.select("myNewColumn").show(5,false)
17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+-----------+
|myNewColumn|
+-----------+
|2D |
|2D |
|2D |
|2D |
|2D |
+-----------+
only showing top 5 rows
Похоже, функция udf вызывается только один раз, не так ли? Мне нужен новый случайный элемент в этом столбце.
обновление 4 @user6910411
у меня есть этот фактический код, который увеличивает идентификатор, но он не объединяет окончательный символ, это странно. Это мой код:
import org.apache.spark.sql.functions.udf
val mydf = sqlContext.read.parquet("some.parquet")
mydf.cache
def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D")
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber))
scala> myNewDF.select("myNewColumn").show(5,false)
17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_4_0]
+-----------+
|myNewColumn|
+-----------+
|0 |
|1 |
|2 |
|3 |
|4 |
+-----------+
Мне нужно что-то вроде:
+-----------+
|myNewColumn|
+-----------+
|1D |
|2D |
|3D |
|4D |
+-----------+
2 ответа
Spark >= 2,3
Можно отключить некоторые оптимизации, используя asNondeterministic
метод:
import org.apache.spark.sql.expressions.UserDefinedFunction
val f: UserDefinedFunction = ???
val fNonDeterministic: UserDefinedFunction = f.asNondeterministic
Пожалуйста, убедитесь, что вы понимаете гарантии, прежде чем использовать эту опцию.
Искра < 2.3
Функция, которая передается в udf, должна быть детерминированной (с возможным исключением SPARK-20586), и вызовы нулевых функций могут быть заменены константами. Если вы хотите генерировать случайные числа, используйте одну из встроенных функций:
rand
- Создать случайный столбец с независимыми и одинаково распределенными (iid) выборками из U [0.0, 1.0].randn
- Создать столбец с независимыми и идентично распределенными (iid) выборками из стандартного нормального распределения.
и преобразовать выходные данные для получения требуемого распределения, например:
(rand * Integer.MAX_VALUE).cast("bigint").cast("string")
Вы можете использовать monotonically_increasing_id
генерировать случайные значения.
Затем вы можете определить UDF для добавления к нему любой строки после приведения ее к String как monotonically_increasing_id
возвращает Long по умолчанию.
scala> var df = Seq(("Ron"), ("John"), ("Steve"), ("Brawn"), ("Rock"), ("Rick")).toDF("names")
+-----+
|names|
+-----+
| Ron|
| John|
|Steve|
|Brawn|
| Rock|
| Rick|
+-----+
scala> val appendD = spark.sqlContext.udf.register("appendD", (s: String) => s.concat("D"))
scala> df = df.withColumn("ID",monotonically_increasing_id).selectExpr("names","cast(ID as String) ID").withColumn("ID",appendD($"ID"))
+-----+---+
|names| ID|
+-----+---+
| Ron| 0D|
| John| 1D|
|Steve| 2D|
|Brawn| 3D|
| Rock| 4D|
| Rick| 5D|
+-----+---+