Могу ли я прочитать CSV, представленный в виде строки, в Apache Spark, используя spark-csv
Я знаю, как прочитать CSV-файл в искру с помощью spark-CSV ( https://github.com/databricks/spark-csv), но у меня уже есть CSV-файл, представленный в виде строки, и хотел бы преобразовать эту строку непосредственно в dataframe. Это возможно?
4 ответа
Обновление: Начиная с Spark 2.2.x, наконец-то появился правильный способ сделать это с помощью набора данных.
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
"""
|id, date, timedump
|1, "2014/01/01 23:00:01",1499959917383
|2, "2014/11/31 12:40:32",1198138008843
""".stripMargin.lines.toList).toDS()
val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()
Старые искровые версии
На самом деле вы можете, хотя он использует внутренние библиотеки и не очень широко рекламируется. Просто создайте и используйте свой собственный экземпляр CsvParser. Пример, который работает для меня на spark 1.6.0 и spark-csv_2.10-1.4.0 ниже
import com.databricks.spark.csv.CsvParser
val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
.withUseHeader(true)
.withInferSchema(true)
val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
Принятый ответ не работал для меня в spark 2.2.0, но привел меня к тому, что мне нужно с csvData.lines.toList
val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString
val csvList = streamString.lines.toList
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(csvList.toDS())
.as[SomeCaseClass]
Вы можете проанализировать вашу строку в csv используя, например, scala-csv:
val myCSVdata : Array[List[String]] =
myCSVString.split('\n').flatMap(CSVParser.parseLine(_))
Здесь вы можете сделать немного больше обработки, очистки данных, проверки правильности разбора каждой строки и наличия одинакового количества полей и т. Д.
Вы можете сделать это RDD
записей:
val myCSVRDD : RDD[List[String]] = sparkContext.parallelize(msCSVdata)
Здесь вы можете втиснуть свои списки строк в класс case, чтобы лучше отражать поля ваших данных CSV. Вы должны получить вдохновение от творений Person
s в этом примере:
https://spark.apache.org/docs/latest/sql-programming-guide.html
Я опускаю этот шаг.
Затем вы можете преобразовать в DataFrame:
import spark.implicits._
myCSVDataframe = myCSVRDD.toDF()
Это решение PySpark , с которым я недавно столкнулся и которое было успешным. Здесь я беру выходные данные консоли dataframe.show и создаю файл данных с помощью CSV API Spark.
Поскольку версия Scala уже существует, эта версия PySpark немного отличается от этой. Я использовал это для преобразования выходных данных консоли Impala /hive в CSV для моего модульного тестирования, и это было действительно полезно.
Я использовал регулярные выражения... удалить +-----+ типы строк
re.sub(r'\n[+-]+\n' , '\n', input_data)
import os
import re
import sys
from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
.appName("String to CSV") \
.getOrCreate()
# Input data as a string
input_data = """
+-----+------------------+-------+
|empid|empname |salary|
| 1| Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+
""".replace("|\n","\n").replace("\n|","\n")
#remove +-----+-------+------+ from the string
input_data = re.sub(r'\n[+-]+\n' , '\n', input_data)
# Capture the input data as a string
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("\n")))
df.printSchema()
# Show the result CSV data
df.show()
Полное объяснение есть в моей статье .