Как сделать хорошие воспроизводимые примеры Apache Spark Dataframe

Я потратил довольно много времени на чтение некоторых вопросов с помощью тегов pyspark и /questions/tagged/spark-dataframe, и очень часто я обнаруживаю, что постеры не предоставляют достаточно информации, чтобы по-настоящему понять их вопрос. Я обычно комментирую, прося их опубликовать MCVE, но иногда заставляя их показывать некоторые образцы входных / выходных данных, все равно что тянуть зубы. Например: см. Комментарии к этому вопросу.

Возможно, часть проблемы в том, что люди просто не знают, как легко создать MCVE для фреймов с искровыми данными. Я думаю, что было бы полезно иметь версию этого вопроса о пандах, основанную на искровых данных, в качестве руководства, которое можно связать.

Так как же создать хороший, воспроизводимый пример?

4 ответа

Решение

Предоставьте небольшие образцы данных, которые могут быть легко воссозданы.

По крайней мере, постеры должны содержать пару строк и столбцов в своем фрейме данных и код, который можно использовать для его простого создания. Под легким я имею в виду вырезать и вставить. Сделайте это как можно меньше, чтобы продемонстрировать свою проблему.


У меня есть следующий фрейм данных:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

который может быть создан с помощью этого кода:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

Показать желаемый результат.

Задайте свой конкретный вопрос и покажите нам желаемый результат.


Как я могу создать новый столбец 'is_divisible' это имеет значение 'yes' если день месяца 'date' плюс 7 дней делится на значение в столбце 'X' и 'no' иначе?

Желаемый результат:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

Объясните, как получить ваш вывод.

Объясните, очень подробно, как вы получаете желаемый результат. Это помогает показать пример расчета.


Например, в строке 1 X = 1 и дата = 2017-01-01. Добавление 7 дней до даты урожайности 2017-01-08. День месяца - 8, а так как 8 делится на 1, ответ - "да".

Аналогично, для последней строки X = 7 и даты = 2017-01-04. Добавление 7 к дате дает 11 как день месяца. Так как 11 % 7 не 0, ответ "нет".


Поделитесь своим существующим кодом.

Покажите нам, что вы сделали или попробовали, включая весь * код, даже если он не работает. Сообщите нам, где вы застряли, и если вы получили сообщение об ошибке, пожалуйста, включите сообщение об ошибке.

(* Вы можете пропустить код для создания контекста искры, но вы должны включить все операции импорта.)


Я знаю, как добавить новый столбец, который date плюс 7 дней, но у меня проблемы с получением дня месяца в виде целого числа.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

Включить версии, импортировать и использовать подсветку синтаксиса


Для сообщений о настройке производительности, включите план выполнения

  • Полная информация в этом ответе написана пользователем user8371915.
  • Это помогает использовать стандартизированные имена для контекстов.

Разбор искровых выходных файлов

  • MaxU предоставил полезный код в этом ответе, чтобы помочь разобрать выходные файлы Spark в DataFrame.

Другие заметки.

  • Обязательно прочитайте, как спрашивать и Как создать пример Minimal, Complete и Verifiable.
  • Прочитайте другие ответы на этот вопрос, которые связаны выше.
  • Иметь хорошее, описательное название.
  • Будьте вежливы. Люди на ТА являются добровольцами, так что спрашивайте приятно.

Настройка производительности

Если вопрос связан с настройкой производительности, пожалуйста, включите следующую информацию.

План выполнения

Лучше всего включить расширенный план выполнения. В Python:

df.explain(True) 

В Скала:

df.explain(true)

или расширенный план выполнения со статистикой. В Python:

print(df._jdf.queryExecution().stringWithStats())

в Скале:

df.queryExecution.stringWithStats

Информация о режиме и кластере

  • mode - local, clientкластер.
  • Диспетчер кластеров (если применимо) - нет (локальный режим), автономно, YARN, Mesos, Kubernetes.
  • Основная информация о конфигурации (количество ядер, память исполнителя).

Информация о времени

slow является относительным, особенно если вы портируете нераспределенное приложение или ожидаете низкой задержки. Точные сроки для различных задач и этапов, могут быть получены из Spark UI (sc.uiWebUrl) jobs или Spark REST UI.

Используйте стандартизированные имена для контекстов

Использование установленных имен для каждого контекста позволяет нам быстро воспроизвести проблему.

  • sc - за SparkContext,
  • sqlContext - за SQLContext,
  • spark - за SparkSession,

Предоставить информацию о типе (Scala)

Мощный вывод типов - одна из самых полезных функций Scala, но она затрудняет анализ кода, вырванного из контекста. Даже если тип очевиден из контекста, лучше аннотировать переменные. предпочитать

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

над

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

Обычно используемые инструменты могут помочь вам:

  • spark-shell / Scala shell

    использование :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
    
  • InteliJ Idea

    Используйте Alt + =

Хороший вопрос и ответ; некоторые дополнительные предложения:

Включите вашу версию Spark

Spark все еще развивается, хотя и не так быстро, как в дни 1.x. Всегда (но особенно если вы используете несколько более старую версию) хорошей идеей будет включить вашу рабочую версию. Лично я всегда начинаю свои ответы с:

spark.version
# u'2.2.0'

или же

sc.version
# u'2.2.0'

Включение вашей версии Python тоже никогда не является плохой идеей.


Включить весь ваш импорт

Если ваш вопрос касается не только Spark SQL и фреймов данных, например, если вы намереваетесь использовать свой фрейм данных в какой-либо операции машинного обучения, четко указывайте свои операции импорта - см. Этот вопрос, где импорт был добавлен в OP только после обширного обмена в (теперь удалены) комментарии (и оказалось, что эти неправильные операции импорта были основной причиной проблемы).

Почему это необходимо? Потому что, например, это LDA

from pyspark.mllib.clustering import LDA

отличается от этого LDA:

from pyspark.ml.clustering import LDA

первый - из старого API на основе RDD (ранее Spark MLlib), а второй - из нового API на основе данных (Spark ML).


Включить подсветку кода

Хорошо, я признаюсь, что это субъективно: я считаю, что вопросы PySpark не должны быть помечены как python по умолчанию; Дело в том, python тег автоматически подсвечивает код (и я считаю, что это основная причина для тех, кто использует его для вопросов PySpark). В любом случае, если вы согласны и вам все еще нужен хороший выделенный код, просто включите соответствующую директиву уценки:

<!-- language-all: lang-python -->

где-то в вашем посте, перед вашим первым фрагментом кода.

[ОБНОВЛЕНИЕ: я запросил автоматическую подсветку синтаксиса для pyspark а также sparkr Теги - приветствия приветствуются]

Эта небольшая вспомогательная функция может помочь проанализировать выходные файлы Spark в DataFrame:

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

Использование:

df = read_spark_output("file:///tmp/spark.out")

PS: для pyspark, eqNullSafe доступно из spark 2.3,

Другие вопросы по тегам