Ошибка кодера при попытке сопоставить строку информационного кадра с обновленной строкой

Когда я пытаюсь сделать то же самое в моем коде, как указано ниже

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

Я взял вышеупомянутую ссылку отсюда: Scala: Как я могу заменить значение в Dataframs, используя scala, но я получаю ошибку кодировщика как

Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, S tring и т. Д.) И типы Product (классы дел) поддерживаются путем импорта spark.im plicits._ Поддержка сериализации других типов будет добавлена ​​в будущих выпусках.

Примечание: я использую spark 2.0!

2 ответа

Решение

Здесь нет ничего неожиданного. Вы пытаетесь использовать код, который был написан с помощью Spark 1.x и больше не поддерживается в Spark 2.0:

  • в 1.х DataFrame.map является ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • в 2.х Dataset[Row].map является ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

Честно говоря, в 1.х тоже не было особого смысла. Независимо от версии вы можете просто использовать DataFrame API:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

Если вы действительно хотите использовать map вы должны использовать статически типизированный Dataset:

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

или, по крайней мере, вернуть объект, который будет иметь неявный кодер:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

Наконец, если по какой-то совершенно безумной причине вы действительно хотите нанести на карту Dataset[Row] Вы должны предоставить необходимый кодер:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)

Для сценария, в котором заранее известна схема датафрейма, решение, заданное @zero323, является решением

но для сценария с динамической схемой / или передачи нескольких фреймов данных в общую функцию: следующий код работал для нас при переходе с 1.6.1 с 2.2.0

import org.apache.spark.sql.Row

val df = Seq(
   (2012, "Tesla", "S"), (1997, "Ford", "E350"),
   (2015, "Chevy", "Volt")
 ).toDF("year", "make", "model")

val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

этот код выполняется на обеих версиях spark.

Недостаток: оптимизация, обеспечиваемая искрой, на базе данных / наборах данных не применяется.

Просто чтобы добавить несколько других важных моментов, чтобы хорошо понять другие ответы (особенно последний пункт ответа @zero323 оmap над Dataset[Row]):

  • Прежде всего, Dataframe.map дает вам Dataset (более конкретно, Dataset[T], скорее, чем Dataset[Row])!
  • И Dataset[T] всегда требует кодировщика, вот что это за предложение "Dataset[Row].map является ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]" средства.
  • На самом деле существует множество кодировщиков, предопределенных Spark (которые могут быть importЭд, делая import spark.implicits._), но, тем не менее, список не сможет охватить многие специфичные для предметной области типы, которые могут создать разработчики, и в этом случае вам необходимо создать кодировщики самостоятельно.
  • В конкретном примере на этой странице df.map возвращает Row тип для Dataset, и подожди минутку, Row type не входит в список типов, для которых кодировщики предопределены Spark, поэтому вы собираетесь создать его самостоятельно.
  • И я допускаю, что создание кодировщика для Rowtype немного отличается от подхода, описанного в приведенной выше ссылке, и вы должны использовать RowEncoder который берет StructType как параметр, описывающий тип строки, например, что @zero323 предоставляет выше:
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))

// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)

В моем случае с версией Spark 2.4.4 мне пришлось импортировать имплициты. Это общий ответ

val spark2 = spark
import spark2.implicits._

val data = df.rdd.map(row => my_func(row))

где my_func выполнил какую-то операцию.

Другие вопросы по тегам