Как сделать хорошие воспроизводимые примеры Apache Spark Dataframe
Я потратил довольно много времени на чтение некоторых вопросов с помощью тегов pyspark и /questions/tagged/spark-dataframe, и очень часто я обнаруживаю, что постеры не предоставляют достаточно информации, чтобы по-настоящему понять их вопрос. Я обычно комментирую, прося их опубликовать MCVE, но иногда заставляя их показывать некоторые образцы входных / выходных данных, все равно что тянуть зубы. Например: см. Комментарии к этому вопросу.
Возможно, часть проблемы в том, что люди просто не знают, как легко создать MCVE для фреймов с искровыми данными. Я думаю, что было бы полезно иметь версию этого вопроса о пандах, основанную на искровых данных, в качестве руководства, которое можно связать.
Так как же создать хороший, воспроизводимый пример?
4 ответа
Предоставьте небольшие образцы данных, которые могут быть легко воссозданы.
По крайней мере, постеры должны содержать пару строк и столбцов в своем фрейме данных и код, который можно использовать для его простого создания. Под легким я имею в виду вырезать и вставить. Сделайте это как можно меньше, чтобы продемонстрировать свою проблему.
У меня есть следующий фрейм данных:
+-----+---+-----+----------+
|index| X|label| date|
+-----+---+-----+----------+
| 1| 1| A|2017-01-01|
| 2| 3| B|2017-01-02|
| 3| 5| A|2017-01-03|
| 4| 7| B|2017-01-04|
+-----+---+-----+----------+
который может быть создан с помощью этого кода:
df = sqlCtx.createDataFrame(
[
(1, 1, 'A', '2017-01-01'),
(2, 3, 'B', '2017-01-02'),
(3, 5, 'A', '2017-01-03'),
(4, 7, 'B', '2017-01-04')
],
('index', 'X', 'label', 'date')
)
Показать желаемый результат.
Задайте свой конкретный вопрос и покажите нам желаемый результат.
Как я могу создать новый столбец 'is_divisible'
это имеет значение 'yes'
если день месяца 'date'
плюс 7 дней делится на значение в столбце 'X'
и 'no'
иначе?
Желаемый результат:
+-----+---+-----+----------+------------+
|index| X|label| date|is_divisible|
+-----+---+-----+----------+------------+
| 1| 1| A|2017-01-01| yes|
| 2| 3| B|2017-01-02| yes|
| 3| 5| A|2017-01-03| yes|
| 4| 7| B|2017-01-04| no|
+-----+---+-----+----------+------------+
Объясните, как получить ваш вывод.
Объясните, очень подробно, как вы получаете желаемый результат. Это помогает показать пример расчета.
Например, в строке 1 X = 1 и дата = 2017-01-01. Добавление 7 дней до даты урожайности 2017-01-08. День месяца - 8, а так как 8 делится на 1, ответ - "да".
Аналогично, для последней строки X = 7 и даты = 2017-01-04. Добавление 7 к дате дает 11 как день месяца. Так как 11 % 7 не 0, ответ "нет".
Поделитесь своим существующим кодом.
Покажите нам, что вы сделали или попробовали, включая весь * код, даже если он не работает. Сообщите нам, где вы застряли, и если вы получили сообщение об ошибке, пожалуйста, включите сообщение об ошибке.
(* Вы можете пропустить код для создания контекста искры, но вы должны включить все операции импорта.)
Я знаю, как добавить новый столбец, который date
плюс 7 дней, но у меня проблемы с получением дня месяца в виде целого числа.
from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))
Включить версии, импортировать и использовать подсветку синтаксиса
- Полная информация в этом ответе написана desertnaut.
Для сообщений о настройке производительности, включите план выполнения
- Полная информация в этом ответе написана пользователем user8371915.
- Это помогает использовать стандартизированные имена для контекстов.
Разбор искровых выходных файлов
- MaxU предоставил полезный код в этом ответе, чтобы помочь разобрать выходные файлы Spark в DataFrame.
Другие заметки.
- Обязательно прочитайте, как спрашивать и Как создать пример Minimal, Complete и Verifiable.
- Прочитайте другие ответы на этот вопрос, которые связаны выше.
- Иметь хорошее, описательное название.
- Будьте вежливы. Люди на ТА являются добровольцами, так что спрашивайте приятно.
Настройка производительности
Если вопрос связан с настройкой производительности, пожалуйста, включите следующую информацию.
План выполнения
Лучше всего включить расширенный план выполнения. В Python:
df.explain(True)
В Скала:
df.explain(true)
или расширенный план выполнения со статистикой. В Python:
print(df._jdf.queryExecution().stringWithStats())
в Скале:
df.queryExecution.stringWithStats
Информация о режиме и кластере
mode
-local
,client
кластер.- Диспетчер кластеров (если применимо) - нет (локальный режим), автономно, YARN, Mesos, Kubernetes.
- Основная информация о конфигурации (количество ядер, память исполнителя).
Информация о времени
slow является относительным, особенно если вы портируете нераспределенное приложение или ожидаете низкой задержки. Точные сроки для различных задач и этапов, могут быть получены из Spark UI (sc.uiWebUrl
) jobs
или Spark REST UI.
Используйте стандартизированные имена для контекстов
Использование установленных имен для каждого контекста позволяет нам быстро воспроизвести проблему.
sc
- заSparkContext
,sqlContext
- заSQLContext
,spark
- заSparkSession
,
Предоставить информацию о типе (Scala)
Мощный вывод типов - одна из самых полезных функций Scala, но она затрудняет анализ кода, вырванного из контекста. Даже если тип очевиден из контекста, лучше аннотировать переменные. предпочитать
val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))
над
val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))
Обычно используемые инструменты могут помочь вам:
spark-shell
/ Scala shellиспользование
:t
scala> val rdd = sc.textFile("README.md") rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> :t rdd org.apache.spark.rdd.RDD[String]
InteliJ Idea
Используйте Alt + =
Хороший вопрос и ответ; некоторые дополнительные предложения:
Включите вашу версию Spark
Spark все еще развивается, хотя и не так быстро, как в дни 1.x. Всегда (но особенно если вы используете несколько более старую версию) хорошей идеей будет включить вашу рабочую версию. Лично я всегда начинаю свои ответы с:
spark.version
# u'2.2.0'
или же
sc.version
# u'2.2.0'
Включение вашей версии Python тоже никогда не является плохой идеей.
Включить весь ваш импорт
Если ваш вопрос касается не только Spark SQL и фреймов данных, например, если вы намереваетесь использовать свой фрейм данных в какой-либо операции машинного обучения, четко указывайте свои операции импорта - см. Этот вопрос, где импорт был добавлен в OP только после обширного обмена в (теперь удалены) комментарии (и оказалось, что эти неправильные операции импорта были основной причиной проблемы).
Почему это необходимо? Потому что, например, это LDA
from pyspark.mllib.clustering import LDA
отличается от этого LDA:
from pyspark.ml.clustering import LDA
первый - из старого API на основе RDD (ранее Spark MLlib), а второй - из нового API на основе данных (Spark ML).
Включить подсветку кода
Хорошо, я признаюсь, что это субъективно: я считаю, что вопросы PySpark не должны быть помечены как python
по умолчанию; Дело в том, python
тег автоматически подсвечивает код (и я считаю, что это основная причина для тех, кто использует его для вопросов PySpark). В любом случае, если вы согласны и вам все еще нужен хороший выделенный код, просто включите соответствующую директиву уценки:
<!-- language-all: lang-python -->
где-то в вашем посте, перед вашим первым фрагментом кода.
[ОБНОВЛЕНИЕ: я запросил автоматическую подсветку синтаксиса для pyspark
а также sparkr
Теги - приветствия приветствуются]
Эта небольшая вспомогательная функция может помочь проанализировать выходные файлы Spark в DataFrame:
PySpark:
from pyspark.sql.functions import *
def read_spark_output(file_path):
step1 = spark.read \
.option("header","true") \
.option("inferSchema","true") \
.option("delimiter","|") \
.option("parserLib","UNIVOCITY") \
.option("ignoreLeadingWhiteSpace","true") \
.option("ignoreTrailingWhiteSpace","true") \
.option("comment","+") \
.csv("file://{}".format(file_path))
# select not-null columns
step2 = t.select([c for c in t.columns if not c.startswith("_")])
# deal with 'null' string in column
return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])
Scala:
// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
val step1 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "|")
.option("parserLib", "UNIVOCITY")
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("comment", "+")
.csv(filePath)
val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)
val columns = step2.columns
columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}
Использование:
df = read_spark_output("file:///tmp/spark.out")
PS: для pyspark, eqNullSafe
доступно из spark 2.3
,