Получить CSV для Spark DataFrame

Я использую Python на Spark и хотел бы получить CSV в dataframe.

Документация по Spark SQL, как ни странно, не дает объяснения CSV в качестве источника.

Я нашел Spark-CSV, но у меня есть проблемы с двумя частями документации:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3"Мне действительно нужно добавлять этот аргумент каждый раз, когда я запускаю pyspark или spark-submit? Это кажется очень не элегантным. Разве нет способа импортировать его в python вместо повторной загрузки каждый раз?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv") Даже если я сделаю выше, это не сработает. Что означает аргумент "источник" в этой строке кода? Как мне просто загрузить локальный файл в Linux, скажем "/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?

8 ответов

Решение

Считайте файл csv в RDD, а затем сгенерируйте RowRDD из исходного RDD.

Создайте схему, представленную StructType, которая соответствует структуре строк в RDD, созданной на шаге 1.

Примените схему к RDD Rows с помощью метода createDataFrame, предоставленного SQLContext.

lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

источник: СПАРК РУКОВОДСТВО ПО ПРОГРАММИРОВАНИЮ

С более свежими версиями Spark (начиная с версии 1.4) это стало намного проще. Выражение sqlContext.read дает вам DataFrameReader Например, с .csv() метод:

df = sqlContext.read.csv("/path/to/your.csv")

Обратите внимание, что вы также можете указать, что файл CSV имеет заголовок, добавив ключевое слово аргумент header=True к .csv() вызов. Несколько других опций доступны и описаны по ссылке выше.

from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
               .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()

для Pyspark, предполагая, что первая строка файла CSV содержит заголовок

spark = SparkSession.builder.appName('chosenName').getOrCreate()
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)

Если вы не против дополнительной зависимости пакета, вы можете использовать Pandas для анализа файла CSV. Он прекрасно обрабатывает внутренние запятые.

зависимости:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

Читайте весь файл сразу в DataFrame Spark:

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
s_df = sql_sc.createDataFrame(pandas_df)

Или, что еще важнее, вы можете объединить данные в Spark RDD, а не в DF:

chunk_100k = pd.read_csv('file.csv', chunksize=100000)

for chunky in chunk_100k:
    Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
    try:
        Spark_full_rdd += Spark_temp_rdd
    except NameError:
        Spark_full_rdd = Spark_temp_rdd
    del Spark_temp_rdd

Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])

После Spark 2.0 рекомендуется использовать Spark Session:

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession
spark = SparkSession \
    .builder \
    .appName("basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3]))

lines = spark.sparkContext.textFile("file.csv")
df = lines.map(mapper)

# Infer the schema, and register the DataFrame as a table.
schemaDf = spark.createDataFrame(df).cache()
schemaDf.createOrReplaceTempView("tablename")

С текущей реализацией (искра 2.X) вам не нужно добавлять аргумент пакетов, вы можете использовать встроенную реализацию csv

Кроме того, в качестве принятого ответа вам не нужно создавать rdd, а затем применять схему, которая имеет 1 потенциальную проблему

Когда вы читаете csv as, он помечает все поля как строку, а когда вы применяете схему с целочисленным столбцом, вы получите исключение.

Лучшим способом сделать это было бы

 spark.read.format("csv").schema(schema).option("header", "true").load(input_path).show() 

Основано на ответе Аравинда, но намного короче, например:

lines = sc.textFile("/path/to/file").map(lambda x: x.split(","))
df = lines.toDF(["year", "month", "day", "count"])

Я столкнулся с подобной проблемой. Решение состоит в том, чтобы добавить переменную среды с именем "PYSPARK_SUBMIT_ARGS" и установить ее значение "--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell". Это работает с интерактивной оболочкой Spark's Python.

Убедитесь, что вы соответствуете версии spark-csv с установленной версией Scala. В Scala 2.11 это spark-csv_2.11, а в Scala 2.10 или 2.10.5 - spark-csv_2.10.

Надеюсь, что это работает.

Другие вопросы по тегам