Набор данных Spark 2.0 против DataFrame

Начиная со спарка 2.0.1 у меня возникли вопросы. Я прочитал много документации, но до сих пор не смог найти достаточных ответов:

  • В чем разница между
    • df.select("foo")
    • df.select($"foo")
  • правильно ли я понимаю, что
    • myDataSet.map(foo.someVal) является безопасным и не преобразуется в RDD но оставайтесь в представлении DataSet / без дополнительных накладных расходов (с точки зрения производительности для 2.0.0)
  • все остальные команды, например, select,.. являются просто синтаксическим сахаром. Они не безопасны, и вместо них можно использовать карту. Как я мог df.select("foo") безопасный тип без оператора карты?
    • почему я должен использовать UDF / UADF вместо карты (при условии, что карта остается в представлении набора данных)?

2 ответа

Решение
  1. Разница между df.select("foo") а также df.select($"foo") это подпись. Первый занимает по крайней мере один Stringчем позже один ноль или больше Columns, За этим нет практической разницы.
  2. myDataSet.map(foo.someVal) проверки типа, но как-нибудь Dataset операция использует RDD объектов, и по сравнению с DataFrame операции, есть значительные накладные расходы. Давайте посмотрим на простой пример:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    Как видите, этот план выполнения требует доступа ко всем полям и должен DeserializeToObject,

  3. Нет. Как правило, другие методы не являются синтаксическим сахаром и генерируют существенно другой план выполнения. Например:

    ds.select($"foo").explain
    
    == Physical Plan ==
    LocalTableScan [foo#117]
    

    По сравнению с показанным планом, он может получить прямой доступ к колонке. Это не столько ограничение API, сколько результат различия в операционной семантике.

  4. Как я могу использовать df.select("foo") в безопасном виде без оператора map?

    Там нет такой опции. В то время как типизированные столбцы позволяют преобразовывать статически Dataset в другой статически типизированный Dataset:

    ds.select($"bar".as[Int])
    

    нет типа безопасных. Существуют и другие попытки включить оптимизированные по типу операции, такие как типизированные агрегации, но это экспериментальный API.

  5. почему я должен использовать UDF / UADF вместо карты

    Это полностью зависит от вас. Каждая распределенная структура данных в Spark имеет свои преимущества и недостатки (см., Например, Spark UDAF с ArrayType в качестве проблем с производительностью bufferSchema).

Лично я нахожу статически типизированным Dataset быть наименее полезным:

  • Не предоставляйте тот же диапазон оптимизации, что и Dataset[Row] (хотя они совместно используют формат хранения и некоторые оптимизации плана выполнения, он не в полной мере извлекает выгоду из генерации кода или хранения вне кучи), а также не имеет доступа ко всем аналитическим возможностям DataFrame,

  • Типизированные преобразования являются черными ящиками и эффективно создают барьер для анализа для оптимизатора. Например, выборки (фильтры) не могут быть перенесены на типизированное преобразование:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
    
    == Physical Plan ==
    *Filter (foo#133 = 1)
    +- *Filter <function1>.apply
       +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
          +- Exchange hashpartitioning(foo#133, 200)
             +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                +- LocalTableScan [foo#133, bar#134]
    

    По сравнению с:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
    
    == Physical Plan ==
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
    +- Exchange hashpartitioning(foo#133, 200)
       +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
          +- *Filter (foo#133 = 1)
             +- LocalTableScan [foo#133, bar#134] 
    

    Это влияет на такие функции, как предикатное нажатие или проекционное нажатие.

  • Там не так гибко, как RDDs только небольшое подмножество типов поддерживается изначально.

  • "Тип безопасности" с Encoders спорно, когда Dataset конвертируется с помощью as метод. Поскольку форма данных не закодирована с использованием сигнатуры, компилятор может только проверить существование Encoder,

Смежные вопросы:

Искра Dataset намного мощнее, чем Spark Dataframe, Небольшой пример - вы можете только создать Dataframe из Row, Tuple или любой примитивный тип данных, но Dataset дает вам силу создавать Dataset любого непримитивного типа тоже. т.е. вы можете буквально создавать Dataset типа объекта.

Пример:

case class Employee(id:Int,name:String)

Dataset[Employee]   // is valid
Dataframe[Employee] // is invalid

DATAFRAME: DataFrame - это абстракция, которая позволяет просматривать данные в виде схемы.

case class Person(имя: String, возраст: Int, адрес: String)

определенный класс Person

scala> val df = List (Person ("Sumanth", 23, "BNG")

ФРАМ ДАННЫХ ПРОТИВ НАБОРА ДАННЫХ

НАБОР ДАННЫХ: набор данных - это расширение Dataframe API, последней абстракции, которая пытается предоставить лучшее как от RDD, так и от Dataframe.

Другие вопросы по тегам