Как прочитать только n строк большого файла CSV в HDFS с помощью пакета spark-csv?
У меня есть большой распределенный файл в HDFS, и каждый раз, когда я использую sqlContext с пакетом spark-csv, он сначала загружает весь файл, что занимает довольно много времени.
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")
Теперь, когда я просто хочу сделать быструю проверку, все, что мне нужно, - это несколько / любые n строк всего файла.
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)
но все они запускаются после загрузки файла. Разве я не могу просто ограничить количество строк при чтении самого файла? Я имею в виду n_rows эквивалент панд в spark-csv, например:
pd_df = pandas.read_csv("file_path", nrows=20)
Или это может быть тот случай, когда спарк фактически не загружает файл, первый шаг, но в этом случае, почему мой шаг загрузки файла занимает слишком много времени?
я хочу
df.count()
дать мне только n
и не все строки, это возможно?
7 ответов
Ты можешь использовать limit(n)
,
sqlContext.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true').load("file_path").limit(20)
Это просто загрузит 20 строк.
Насколько я понимаю, чтение только нескольких строк не поддерживается модулем spark-csv напрямую, и в качестве обходного пути вы можете просто прочитать файл как текстовый файл, взять столько строк, сколько вы хотите, и сохранить его во временном местоположении. Сохраняя строки, вы можете использовать spark-csv для чтения строк, включая inferSchema
вариант (который вы можете использовать, если вы находитесь в режиме исследования).
val numberOfLines = ...
spark.
read.
text("myfile.csv").
limit(numberOfLines).
write.
text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
read.
option("inferSchema", true). // <-- you are in exploration mode, aren't you?
csv(s"myfile-$numberOfLines.csv")
Не предполагающая схема и использование limit(n)
работал для меня во всех аспектах.
f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)
Примечание: если мы используем inferschema='true'
, это опять то же самое время, и, возможно, отсюда то же самое.
Но если у нас нет представления о схеме, решения Jacek Laskowski тоже хорошо работают.:)
Решение, данное Яцеком Ласковски, работает хорошо. Ниже представлен вариант в памяти.
Я недавно столкнулся с этой проблемой. Я использовал блоки данных и имел огромный каталог csv (200 файлов по 200 МБ каждый)
У меня изначально было
val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")
display(df)
это заняло много времени (10+ минут), но затем я изменил его на ниже, и он запустился мгновенно (2 секунды)
val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)
val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))
display(df)
Вывести схему для текстовых форматов сложно, и это можно сделать таким образом для форматов csv и json(но не для многострочных json) форматов.
Начиная с PySpark 2.3, вы можете просто загружать данные в виде текста, ограничивать и применять читатель csv к результату:
(spark
.read
.options(inferSchema="true", header="true")
.csv(
spark.read.text("/path/to/file")
.limit(20) # Apply limit
.rdd.flatMap(lambda x: x))) # Convert to RDD[str]
Аналог Scala доступен начиная с Spark 2.2:
spark
.read
.options(Map("inferSchema" -> "true", "header" -> "true"))
.csv(spark.read.text("/path/to/file").limit(20).as[String])
В Spark 3.0.0 или новее можно также применить ограничение и использовать from_csv
функция, но для этого требуется схема, поэтому она, вероятно, не будет соответствовать вашим требованиям.
Может быть, это будет полезно тем, кто работает в java. Применение лимита не поможет сократить время. Вам нужно собрать n строк из файла.
DataFrameReader frameReader = spark
.read()
.format("csv")
.option("inferSchema", "true");
//set framereader options, delimiters etc
List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));
Поскольку я не видел этого решения в ответах, мне подходит чистый SQL-подход:
df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")
Если заголовка нет, столбцы будут называться _c0, _c1 и т. Д. Никакой схемы не требуется.