Ошибка кодера при попытке сопоставить строку информационного кадра с обновленной строкой
Когда я пытаюсь сделать то же самое в моем коде, как указано ниже
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
Я взял вышеупомянутую ссылку отсюда: Scala: Как я могу заменить значение в Dataframs, используя scala, но я получаю ошибку кодировщика как
Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, S tring и т. Д.) И типы Product (классы дел) поддерживаются путем импорта spark.im plicits._ Поддержка сериализации других типов будет добавлена в будущих выпусках.
Примечание: я использую spark 2.0!
2 ответа
Здесь нет ничего неожиданного. Вы пытаетесь использовать код, который был написан с помощью Spark 1.x и больше не поддерживается в Spark 2.0:
- в 1.х
DataFrame.map
является((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
- в 2.х
Dataset[Row].map
является((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
Честно говоря, в 1.х тоже не было особого смысла. Независимо от версии вы можете просто использовать DataFrame
API:
import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
Если вы действительно хотите использовать map
вы должны использовать статически типизированный Dataset
:
import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
case rec => rec
}
или, по крайней мере, вернуть объект, который будет иметь неявный кодер:
df.map {
case Row(year: Int, make: String, model: String) =>
(year, if(make.toLowerCase == "tesla") "S" else make, model)
}
Наконец, если по какой-то совершенно безумной причине вы действительно хотите нанести на карту Dataset[Row]
Вы должны предоставить необходимый кодер:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
StructField("year", IntegerType),
StructField("make", StringType),
StructField("model", StringType)
))
val encoder = RowEncoder(schema)
df.map {
case Row(year, make: String, model) if make.toLowerCase == "tesla" =>
Row(year, "S", model)
case row => row
} (encoder)
Для сценария, в котором заранее известна схема датафрейма, решение, заданное @zero323, является решением
но для сценария с динамической схемой / или передачи нескольких фреймов данных в общую функцию: следующий код работал для нас при переходе с 1.6.1 с 2.2.0
import org.apache.spark.sql.Row
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
val data = df.rdd.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
этот код выполняется на обеих версиях spark.
Недостаток: оптимизация, обеспечиваемая искрой, на базе данных / наборах данных не применяется.
Просто чтобы добавить несколько других важных моментов, чтобы хорошо понять другие ответы (особенно последний пункт ответа @zero323 оmap
над
Dataset[Row]
):
- Прежде всего,
Dataframe.map
дает вамDataset
(более конкретно,Dataset[T]
, скорее, чемDataset[Row]
)! - И
Dataset[T]
всегда требует кодировщика, вот что это за предложение "Dataset[Row].map
является((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
" средства. - На самом деле существует множество кодировщиков, предопределенных Spark (которые могут быть
import
Эд, делаяimport spark.implicits._
), но, тем не менее, список не сможет охватить многие специфичные для предметной области типы, которые могут создать разработчики, и в этом случае вам необходимо создать кодировщики самостоятельно. - В конкретном примере на этой странице
df.map
возвращаетRow
тип дляDataset
, и подожди минутку,Row
type не входит в список типов, для которых кодировщики предопределены Spark, поэтому вы собираетесь создать его самостоятельно. - И я допускаю, что создание кодировщика для
Row
type немного отличается от подхода, описанного в приведенной выше ссылке, и вы должны использоватьRowEncoder
который беретStructType
как параметр, описывающий тип строки, например, что @zero323 предоставляет выше:
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))
// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)
В моем случае с версией Spark 2.4.4 мне пришлось импортировать имплициты. Это общий ответ
val spark2 = spark
import spark2.implicits._
val data = df.rdd.map(row => my_func(row))
где my_func выполнил какую-то операцию.