Спарк производительности для Scala против Python

Я предпочитаю Python, а не Scala. Но, поскольку Spark изначально написан на Scala, я ожидал, что мой код будет работать быстрее в Scala, чем версия Python по понятным причинам.

Исходя из этого предположения, я подумал изучить и написать Scala-версию очень распространенного кода предварительной обработки для 1 ГБ данных. Данные взяты из конкурса SpringLeaf на Kaggle. Просто, чтобы дать обзор данных (он содержит 1936 измерений и 145232 строк). Данные состоят из различных типов, например, int, float, string, boolean. Я использую 6 ядер из 8 для обработки Spark; вот почему я использовал minPartitions=6 так что каждое ядро ​​должно что-то обрабатывать.

Скала код

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Код Python

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Скала Performance Stage 0 (38 минут), Этап 1 (18 секунд) введите описание изображения здесь

Стадияисполнения Python0 (11 минут), стадия 1 (7 секунд) введите описание изображения здесь

Оба дают разные графики визуализации DAG (из-за которых на обоих рисунках показаны разные функции этапа 0 для Scala (map) и Python (reduceByKey))

Но, по сути, оба кода пытаются преобразовать данные в (размер_идентификатора, строку списка значений) и сохранить на диск. Вывод будет использоваться для вычисления различной статистики для каждого измерения.

С точки зрения производительности, Scala-код для таких реальных данных, как этот, кажется, работает в 4 раза медленнее, чем версия Python. Хорошей новостью для меня является то, что это дало мне хорошую мотивацию, чтобы остаться с Python. Плохая новость - я не совсем понял, почему?

1 ответ


Оригинальный ответ с обсуждением кода можно найти ниже.


Прежде всего, вы должны различать разные типы API, каждый со своими соображениями производительности.

RDD API

(чистые структуры Python с оркестровкой на основе JVM)

Это компонент, который больше всего будет зависеть от производительности кода Python и деталей реализации PySpark. Несмотря на то, что производительность Python вряд ли будет проблемой, необходимо учитывать как минимум несколько факторов:

  • Накладные расходы на коммуникацию JVM. Практически все данные, поступающие к исполнителю Python и от него, должны передаваться через сокет и JVM-работника. Хотя это относительно эффективное местное общение, оно все же не является бесплатным.
  • Исполнители на основе процессов (Python) и исполнители на основе потоков (один JVM-несколько потоков) (Scala). Каждый исполнитель Python работает в своем собственном процессе. Как побочный эффект, он обеспечивает более сильную изоляцию, чем его аналог JVM, и некоторый контроль над жизненным циклом исполнителя, но потенциально значительно более высокое использование памяти:

    • след памяти переводчика
    • след загруженных библиотек
    • менее эффективное вещание (каждому процессу требуется собственная копия вещания)
  • Производительность самого кода Python. Вообще говоря, Scala быстрее, чем Python, но зависит от задачи к задаче. Более того, у вас есть несколько вариантов, включая JIT, такие как Numba, расширения C ( Cython) или специализированные библиотеки, такие как Theano. Наконец, если вы не используете ML / MLlib (или просто стек NumPy), рассмотрите возможность использования PyPy в качестве альтернативного интерпретатора. Смотрите SPARK-3094.

  • Конфигурация PySpark обеспечивает spark.python.worker.reuse опция, которую можно использовать для выбора между разветвлением процесса Python для каждой задачи и повторным использованием существующего процесса. Последний вариант представляется полезным, чтобы избежать дорогостоящей сборки мусора (это скорее впечатление, чем результат систематических тестов), тогда как первый (по умолчанию) является оптимальным для дорогостоящих широковещательных рассылок и импорта.
  • Подсчет ссылок, используемый в CPython в качестве метода сбора мусора первой строки, довольно хорошо работает с типичными рабочими нагрузками Spark (потоковая обработка, без циклов ссылок) и снижает риск длительных пауз GC.

MLlib

(смешанное выполнение Python и JVM)

Основные соображения почти такие же, как и раньше, с некоторыми дополнительными проблемами. В то время как базовые структуры, используемые с MLlib, являются простыми объектами RDD Python, все алгоритмы выполняются напрямую с использованием Scala.

Это означает дополнительную стоимость преобразования объектов Python в объекты Scala и наоборот, увеличение использования памяти и некоторые дополнительные ограничения, которые мы рассмотрим позже.

На данный момент (Spark 2.x) API на основе RDD находится в режиме обслуживания и планируется удалить в Spark 3.0.

DataFrame API и Spark ML

(Выполнение JVM с кодом Python ограничено драйвером)

Это, вероятно, лучший выбор для стандартных задач обработки данных. Поскольку код Python в основном ограничивается высокоуровневыми логическими операциями на драйвере, не должно быть различий в производительности между Python и Scala.

Единственное исключение - использование построчных пользовательских функций Python, которые значительно менее эффективны, чем их эквиваленты в Scala. Хотя есть некоторые возможности для улучшений (в Spark 2.0.0 произошли существенные изменения), самым большим ограничением является полный переход между внутренним представлением (JVM) и интерпретатором Python. Если возможно, вы должны предпочесть композицию встроенных выражений ( например. Поведение Python UDF было улучшено в Spark 2.0.0, но оно все еще неоптимально по сравнению с собственным выполнением. Это может улучшиться в будущем с введением векторизованных UDF (СПАРК-21190).

Также обязательно избегайте ненужной передачи данных между DataFrames а также RDDs, Это требует дорогой сериализации и десериализации, не говоря уже о передаче данных в и из интерпретатора Python.

Стоит отметить, что у вызовов Py4J довольно высокая задержка. Это включает в себя простые звонки, такие как:

from pyspark.sql.functions import col

col("foo")

Как правило, это не должно иметь значения (накладные расходы постоянны и не зависят от объема данных), но в случае мягких приложений реального времени вы можете рассмотреть возможность кэширования / повторного использования оболочек Java.

GraphX ​​и Spark DataSets

На данный момент (Spark 1.6 2.1) ни один из них не предоставляет PySpark API, так что вы можете сказать, что PySpark бесконечно хуже, чем Scala.

Graphx

На практике разработка GraphX ​​остановилась почти полностью, и проект в настоящее время находится в режиме обслуживания с закрытыми билетами JIRA, которые не будут исправлены. Библиотека GraphFrames предоставляет альтернативную библиотеку обработки графов с привязками Python.

Dataset

Субъективно говоря, не так много места для статически типизированных Datasets в Python, и даже если бы существовала текущая реализация Scala, она слишком упрощенная и не дает тех же преимуществ с точки зрения производительности, что и DataFrame,

Streaming

Из того, что я видел до сих пор, я настоятельно рекомендую использовать Scala поверх Python. Это может измениться в будущем, если PySpark получит поддержку для структурированных потоков, но сейчас Scala API кажется гораздо более надежным, всеобъемлющим и эффективным. Мой опыт довольно ограничен.

Структурированные потоковые потоки в Spark 2.x, похоже, сокращают разрыв между языками, но пока он все еще находится на раннем этапе. Тем не менее, API, основанный на RDD, уже упоминается как "устаревшая потоковая передача" в Документации блоков данных (дата доступа 2017-03-03), поэтому разумно ожидать дальнейших усилий по объединению.

Неэффективные соображения

Характеристика паритета

Не все функции Spark предоставляются через API PySpark. Не забудьте проверить, что необходимые компоненты уже выполнены, и попытаться понять возможные ограничения.

Это особенно важно, когда вы используете MLlib и аналогичные смешанные контексты (см. Вызов функции Java/Scala из задачи). Чтобы быть справедливым некоторые части PySpark API, такие как mllib.linalg, предоставляет более полный набор методов, чем Scala.

API дизайн

PySpark API близко отражает своего аналога Scala и поэтому не совсем Pythonic. Это означает, что довольно просто отобразить языки, но в то же время код Python может быть значительно сложнее для понимания.

Сложная архитектура

Поток данных PySpark относительно сложен по сравнению с чистым выполнением JVM. Гораздо сложнее рассуждать о программах PySpark или их отладке. Более того, по крайней мере, базовое понимание Scala и JVM в целом является обязательным условием.

Spark 2.x и выше

Продолжающийся сдвиг в сторону Dataset API с замороженным RDD API приносит как возможности, так и проблемы для пользователей Python. В то время как высокоуровневые части API гораздо проще раскрыть в Python, более сложные функции практически невозможно использовать напрямую.

Более того, нативные функции Python продолжают оставаться гражданами второго сорта в мире SQL. Надеемся, что это улучшится в будущем благодаря сериализации Apache Arrow ( текущие усилия направлены на данныеcollectionно UDF serde - это долгосрочная цель).

Для проектов, сильно зависящих от кодовой базы Python, чистые альтернативы Python (такие как Dask или Ray) могут быть интересной альтернативой.

Это не должно быть один против другого

API Spark DataFrame (SQL, Dataset) предоставляет элегантный способ интеграции кода Scala/Java в приложение PySpark. Ты можешь использовать DataFrames выставить данные в собственный код JVM и прочитать результаты обратно. Я объяснил некоторые варианты где-то еще, и вы можете найти рабочий пример Python-Scala туда и обратно в разделе Как использовать класс Scala внутри Pyspark.

Это может быть дополнительно дополнено введением пользовательских типов (см. Как определить схему для пользовательского типа в Spark SQL?).


Что не так с кодом, указанным в вопросе

(Отказ от ответственности: точка зрения Pythonista. Скорее всего, я пропустил некоторые трюки Scala)

Прежде всего, в вашем коде есть одна часть, которая вообще не имеет смысла. Если у вас уже есть (key, value) пары, созданные с использованием zipWithIndex или же enumerate какой смысл создавать строку, чтобы потом ее сразу разделить? flatMap не работает рекурсивно, так что вы можете просто получить кортежи и пропустить следующие map бы то ни было.

Другая часть, которую я считаю проблематичной, reduceByKey, Вообще говоря, reduceByKey полезно, если применение агрегатной функции может уменьшить объем данных, которые необходимо перетасовать. Поскольку вы просто объединяете строки, здесь нечего получить. Игнорируя низкоуровневые вещи, такие как количество ссылок, объем данных, которые вы должны передать, точно такой же, как и для groupByKey,

Обычно я не буду останавливаться на этом, но, насколько я могу судить, это узкое место в вашем коде Scala. Присоединение строк в JVM - довольно дорогая операция (см., Например: Является ли объединение строк в Scala таким же дорогостоящим, как и в Java?). Значит что то подобное _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) что эквивалентно input4.reduceByKey(valsConcat) в вашем коде не очень хорошая идея.

Если вы хотите избежать groupByKey Вы можете попробовать использовать aggregateByKey с StringBuilder, Нечто подобное должно сделать свое дело:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

но я сомневаюсь, что это стоит всей суеты.

Учитывая вышесказанное, я переписал ваш код следующим образом:

Скала:

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python:

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Результаты

В local[6] в режиме (Intel(R) Xeon(R) CPU E3-1245 V2 @ 3,40 ГГц) с 4 ГБ памяти на каждого исполнителя (n = 3):

  • Scala - среднее значение: 250,00 с, стандартное значение: 12,49
  • Python - среднее значение: 246,66 с, стандартное равенство: 1,15

Я почти уверен, что большую часть этого времени тратится на перемешивание, сериализацию, десериализацию и другие второстепенные задачи. Просто для удовольствия, вот наивный однопоточный код в Python, который выполняет ту же задачу на этом компьютере менее чем за минуту:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

Расширение к приведенным выше ответам -

Scala оказывается быстрее во многих отношениях по сравнению с python, но есть несколько веских причин, по которым python становится более популярным, чем scala, позвольте увидеть некоторые из них -

Python для Apache Spark довольно прост в освоении и использовании. Однако это не единственная причина, по которой Pyspark лучше, чем Scala. Есть больше.

Python API для Spark может работать медленнее в кластере, но, в конце концов, специалисты по обработке данных могут сделать с ним гораздо больше, чем со Scala. Сложность Scala отсутствует. Интерфейс простой и понятный.

Говорить о читабельности кода, обслуживании и знакомстве с Python API для Apache Spark намного лучше, чем Scala.

Python поставляется с несколькими библиотеками, связанными с машинным обучением и обработкой естественного языка. Это помогает в анализе данных, а также дает статистику, которая является достаточно зрелой и проверенной временем. Например, numpy, pandas, scikit-learn, seaborn и matplotlib.

Примечание. Большинство специалистов по обработке данных используют гибридный подход, в котором они используют лучшее из обоих API.

Наконец, сообщество Scala часто оказывается гораздо менее полезным для программистов. Это делает Python очень ценным обучением. Если у вас есть достаточный опыт работы с любым языком программирования со статической типизацией, таким как Java, вы можете вообще не беспокоиться о том, чтобы не использовать Scala.

Другие вопросы по тегам