Вызов службы RestAPI от Spark Streaming
У меня есть случай использования, когда мне нужно вызвать RESTAPI из потоковой передачи с искрой после того, как сообщения будут считаны из Kafka, чтобы выполнить некоторые вычисления и сохранить результат в HDFS и стороннем приложении.
У меня есть несколько сомнений здесь:
- Как мы можем вызвать RESTAPI прямо из потока искры.
- Как управлять таймаутом RESTAPI с помощью потокового пакетного времени.
1 ответ
Решение
Этот код не будет компилироваться как есть. Но это подход для данного варианта использования.
val conf = new SparkConf().setAppName("App name").setMaster("yarn")
val ssc = new StreamingContext(conf, Seconds(1))
val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
dstream.foreachRDD { rdd =>
//Write the rdd to HDFS directly
rdd.saveAsTextFile("hdfs/location/to/save")
//loop through each parttion in rdd
rdd.foreachPartition { partitionOfRecords =>
//1. Create HttpClient object here
//2.a POST data to API
//Use it if you want record level control in rdd or partion
partitionOfRecords.foreach { record =>
//2.b Post the the date to API
record.toString
}
}
//Use 2.a or 2.b to POST data as per your req
}
ssc.start()
ssc.awaitTermination()
Большинство HttpClients (для вызова REST) поддерживает время ожидания запроса.
Пример вызова POST Http с тайм-аутом с использованием Apache HttpClient
val CONNECTION_TIMEOUT_MS = 20000; // Timeout in millis (20 sec).
val requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
.setConnectTimeout(CONNECTION_TIMEOUT_MS)
.setSocketTimeout(CONNECTION_TIMEOUT_MS)
.build();
HttpClientBuilder.create().build();
val client: CloseableHttpClient = HttpClientBuilder.create().build();
val url = "https://selfsolve.apple.com/wcResults.do"
val post = new HttpPost(url);
//Set config to post
post.setConfig(requestConfig)
post.setEntity(EntityBuilder.create.setText("some text to post to API").build())
val response: HttpResponse = client.execute(post)