Могу ли я прочитать CSV, представленный в виде строки, в Apache Spark, используя spark-csv

Я знаю, как прочитать CSV-файл в искру с помощью spark-CSV ( https://github.com/databricks/spark-csv), но у меня уже есть CSV-файл, представленный в виде строки, и хотел бы преобразовать эту строку непосредственно в dataframe. Это возможно?

4 ответа

Решение

Обновление: Начиная с Spark 2.2.x, наконец-то появился правильный способ сделать это с помощью набора данных.

import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()

import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
  """
    |id, date, timedump
    |1, "2014/01/01 23:00:01",1499959917383
    |2, "2014/11/31 12:40:32",1198138008843
  """.stripMargin.lines.toList).toDS()

val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()

Старые искровые версии

На самом деле вы можете, хотя он использует внутренние библиотеки и не очень широко рекламируется. Просто создайте и используйте свой собственный экземпляр CsvParser. Пример, который работает для меня на spark 1.6.0 и spark-csv_2.10-1.4.0 ниже

    import com.databricks.spark.csv.CsvParser

val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
  .withUseHeader(true)
  .withInferSchema(true)


val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)

Принятый ответ не работал для меня в spark 2.2.0, но привел меня к тому, что мне нужно с csvData.lines.toList

val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString

val csvList = streamString.lines.toList

spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(csvList.toDS())
  .as[SomeCaseClass]  

Вы можете проанализировать вашу строку в csv используя, например, scala-csv:

val myCSVdata : Array[List[String]] = myCSVString.split('\n').flatMap(CSVParser.parseLine(_))

Здесь вы можете сделать немного больше обработки, очистки данных, проверки правильности разбора каждой строки и наличия одинакового количества полей и т. Д.

Вы можете сделать это RDD записей:

val myCSVRDD : RDD[List[String]] = sparkContext.parallelize(msCSVdata)

Здесь вы можете втиснуть свои списки строк в класс case, чтобы лучше отражать поля ваших данных CSV. Вы должны получить вдохновение от творений Persons в этом примере:

https://spark.apache.org/docs/latest/sql-programming-guide.html

Я опускаю этот шаг.

Затем вы можете преобразовать в DataFrame:

import spark.implicits._ myCSVDataframe = myCSVRDD.toDF()

Это решение PySpark , с которым я недавно столкнулся и которое было успешным. Здесь я беру выходные данные консоли dataframe.show и создаю файл данных с помощью CSV API Spark.

Поскольку версия Scala уже существует, эта версия PySpark немного отличается от этой. Я использовал это для преобразования выходных данных консоли Impala /hive в CSV для моего модульного тестирования, и это было действительно полезно.

Я использовал регулярные выражения... удалить +-----+ типы строк

       re.sub(r'\n[+-]+\n' , '\n', input_data)
      import os
import re
import sys

from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
    .appName("String to CSV") \
    .getOrCreate()

# Input data as a string
input_data = """
+-----+------------------+-------+
|empid|empname           |salary|
|    1|    Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+
""".replace("|\n","\n").replace("\n|","\n")

#remove +-----+-------+------+ from the string
input_data = re.sub(r'\n[+-]+\n' , '\n', input_data)
# Capture the input data as a string
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("\n")))
df.printSchema()
# Show the result CSV data
df.show()

Полное объяснение есть в моей статье .

Другие вопросы по тегам