Как прочитать только n строк большого файла CSV в HDFS с помощью пакета spark-csv?

У меня есть большой распределенный файл в HDFS, и каждый раз, когда я использую sqlContext с пакетом spark-csv, он сначала загружает весь файл, что занимает довольно много времени.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")

Теперь, когда я просто хочу сделать быструю проверку, все, что мне нужно, - это несколько / любые n строк всего файла.

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)

но все они запускаются после загрузки файла. Разве я не могу просто ограничить количество строк при чтении самого файла? Я имею в виду n_rows эквивалент панд в spark-csv, например:

pd_df = pandas.read_csv("file_path", nrows=20)

Или это может быть тот случай, когда спарк фактически не загружает файл, первый шаг, но в этом случае, почему мой шаг загрузки файла занимает слишком много времени?

я хочу

df.count()

дать мне только n и не все строки, это возможно?

7 ответов

Ты можешь использовать limit(n),

sqlContext.format('com.databricks.spark.csv') \
          .options(header='true', inferschema='true').load("file_path").limit(20)

Это просто загрузит 20 строк.

Насколько я понимаю, чтение только нескольких строк не поддерживается модулем spark-csv напрямую, и в качестве обходного пути вы можете просто прочитать файл как текстовый файл, взять столько строк, сколько вы хотите, и сохранить его во временном местоположении. Сохраняя строки, вы можете использовать spark-csv для чтения строк, включая inferSchema вариант (который вы можете использовать, если вы находитесь в режиме исследования).

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")

Не предполагающая схема и использование limit(n) работал для меня во всех аспектах.

f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)

Примечание: если мы используем inferschema='true', это опять то же самое время, и, возможно, отсюда то же самое.

Но если у нас нет представления о схеме, решения Jacek Laskowski тоже хорошо работают.:)

Решение, данное Яцеком Ласковски, работает хорошо. Ниже представлен вариант в памяти.

Я недавно столкнулся с этой проблемой. Я использовал блоки данных и имел огромный каталог csv (200 файлов по 200 МБ каждый)

У меня изначально было

      val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")

display(df)

это заняло много времени (10+ минут), но затем я изменил его на ниже, и он запустился мгновенно (2 секунды)

      val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)

val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))

display(df)

Вывести схему для текстовых форматов сложно, и это можно сделать таким образом для форматов csv и json(но не для многострочных json) форматов.

Начиная с PySpark 2.3, вы можете просто загружать данные в виде текста, ограничивать и применять читатель csv к результату:

(spark
  .read
  .options(inferSchema="true", header="true")
  .csv(
      spark.read.text("/path/to/file")
          .limit(20)                   # Apply limit
          .rdd.flatMap(lambda x: x)))  # Convert to RDD[str]

Аналог Scala доступен начиная с Spark 2.2:

spark
  .read
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .csv(spark.read.text("/path/to/file").limit(20).as[String])

В Spark 3.0.0 или новее можно также применить ограничение и использовать from_csv функция, но для этого требуется схема, поэтому она, вероятно, не будет соответствовать вашим требованиям.

Может быть, это будет полезно тем, кто работает в java. Применение лимита не поможет сократить время. Вам нужно собрать n строк из файла.

              DataFrameReader frameReader = spark
          .read()
          .format("csv")
          .option("inferSchema", "true");
    //set framereader options, delimiters etc

    List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
    return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));

Поскольку я не видел этого решения в ответах, мне подходит чистый SQL-подход:

      df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")

Если заголовка нет, столбцы будут называться _c0, _c1 и т. Д. Никакой схемы не требуется.

Другие вопросы по тегам