Spark SQL: как использовать данные JSON из службы REST в качестве DataFrame
Мне нужно прочитать некоторые данные JSON из веб-службы, предоставляющей REST-интерфейсы, для запроса данных из моего кода SPARK SQL для анализа. Я могу прочитать JSON, хранящийся в хранилище BLOB-объектов и использовать его.
Мне было интересно, как лучше всего прочитать данные из службы REST и использовать их как любой другой DataFrame
,
Кстати я использую SPARK 1.6 of Linux cluster on HD insight
если это поможет Также был бы признателен, если кто-то может поделиться фрагментами кода для того же, что и я все еще очень плохо знаком со средой SPARK.
2 ответа
На Spark 1.6:
Если вы работаете на Python, используйте библиотеку запросов для получения информации, а затем просто создайте из нее СДР. Должна быть какая-то похожая библиотека для Scala (соответствующая тема). Тогда просто сделайте:
json_str = '{"executorCores": 2, "kind": "pyspark", "driverMemory": 1000}'
rdd = sc.parallelize([json_str])
json_df = sqlContext.jsonRDD(rdd)
json_df
Код для Scala:
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Это от: http://spark.apache.org/docs/latest/sql-programming-guide.html
Spark не может проанализировать произвольный json для фрейма данных, потому что json имеет иерархическую структуру, а фрейм данных плоский. Если ваш json не был создан с помощью spark, есть вероятность, что он не соответствует условию "Каждая строка должна содержать отдельный, самодостаточный действительный объект JSON", и, следовательно, его необходимо будет проанализировать с использованием вашего пользовательского кода, а затем передать в dataframe как коллекция объектов класса case или spark sql Rows.
Вы можете скачать как:
import scalaj.http._
val response = Http("proto:///path/to/json")
.header("key", "val").method("get")
.execute().asString.body
а затем проанализируйте ваш JSON, как показано в этом ответе. А затем создайте Seq объектов вашего case-класса (скажем, seq) и создайте фрейм данных как
seq.toDF
Вот так:- искра 2.2
import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
object SparkRestApi {
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger("blah")
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName("blah")
.config("spark.sql.warehouse.dir", "C:\\Temp\\hive")
.master("local[2]")
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val url = "https://api.github.com/users/hadley/orgs"
val result2 = List(scala.io.Source.fromURL(url).mkString)
val githubRdd2=spark.sparkContext.makeRDD(result2)
val gitHubDF2=spark.read.json(githubRdd2)
println(gitHubDF2)
gitHubDF2.show()
spark.stop()
}
}