Опции для чтения больших файлов (чистый текст, xml, json, csv) из hdfs в RStudio с SparkR 1.5

Я новичок в Spark и хотел бы знать, есть ли другие варианты, кроме указанных ниже, для чтения данных, хранящихся в hdfs из RStudio с использованием SparkR, или я правильно их использую. Данные могут быть любого типа (чистый текст, CSV, JSON, XML или любая база данных, содержащая реляционные таблицы) и любого размера (1 КБ - несколько ГБ).

Я знаю, что textFile (sc, path) больше не должен использоваться, но есть ли другие возможности для чтения таких данных, кроме функции read.df?

Следующий код использует read.df и jsonFile, но jsonFile выдает ошибку:

Sys.setenv(SPARK_HOME = "C:\\Users\\--\\Downloads\\spark-1.5.0-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
#load the Sparkr library
library(SparkR)

# Create a spark context and a SQL context
sc <- sparkR.init(master="local", sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

#create a sparkR DataFrame
df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/people.json", source = "json")
df <- jsonFile(sqlContext, "hdfs://0.0.0.0:19000/people.json")

read.df работает для json, но как мне прочитать текст, такой как сообщения журнала, которые разделены только новой строкой? Например

> df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/README.txt", "text")
     Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
  java.lang.ClassNotFoundException: Failed to load class for data source: text.
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
    at org.apache.spark.sql.api.r.SQLUtils$.loadDF(SQLUtils.scala:156)
    at org.apache.spark.sql.api.r.SQLUtils.loadDF(SQLUtils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
    at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
    at org.apache.spark.ap

Ошибка с jsonFile:

> df <- jsonFile(sqlContext, "hdfs://0.0.0.0:19000/people.json")
    Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
  java.io.IOException: No input paths specified in job
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfu

Я не знаю, почему read.df выдает ошибку, потому что я не перезапустил SparkR и не вызвал SparkR.stop()

Для того же кода, кроме использования read.df, я использую функцию SparkR:::textFile и sc вместо sqlContext(следуя устаревшему Введение в amplab).

Сообщение об ошибке:

data <- SparkR:::textFile(sc, "hdfs://0.0.0.0:19000/people.json")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
  java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 5: hdfs:
    at org.apache.hadoop.fs.Path.initialize(Path.java:206)
    at org.apache.hadoop.fs.Path.<init>(Path.java:172)
    at org.apache.hadoop.fs.Path.<init>(Path.java:94)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:211)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at or

Эта ошибка выглядит как неправильный путь, но я не знаю почему.

Что я сейчас использую:

spark-1.5.0-bin-hadoop2.6 hadoop-2.6.0 Windows (8.1) R Версия 3.2.2 Rstudio Версия 0.99.484

Я надеюсь, что кто-нибудь может дать мне несколько советов по этому вопросу здесь.

2 ответа

Пытаться

    % hadoop fs -put people.json /
    % sparkR
    > people <- read.df(sqlContext, "/people.json", "json")
    > head(people) 

Вероятно, вам нужна библиотека для анализа других файлов, например, библиотека DataBricks CSV:

https://github.com/databricks/spark-csv

Затем вы запустите R с загруженным пакетом, например:

$ sparkR --packages com.databricks:spark-csv_2.10:1.0.3

и загрузите ваш файл как:

> df <- read.df(sqlContext, "cars.csv", source = "com.databricks.spark.csv", inferSchema = "true")

Предполагается, что в вашем домашнем каталоге hdfs есть тестовый файл "cars.csv".

НТН

Другие вопросы по тегам