Spark Streaming - пользовательский приемник Travis CI и GitHub - непрерывные данные, но пустой RDD?
В последнее время, как часть научного исследования, я разрабатывал приложение, которое передает (или, по крайней мере, должно) данные из Travis CI и GitHub, используя их REST API. Цель этого состоит в том, чтобы получить представление об отношениях фиксации сборки для дальнейшего выполнения многочисленных анализов.
Для этого я реализовал следующий пользовательский приемник Travis:
object TravisUtils {
def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel)
}
private[streaming]
class TravisInputDStream(ctx : StreamingContext, storageLevel : StorageLevel) extends ReceiverInputDStream[Build](ctx) {
def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel)
}
private[streaming]
class TravisReceiver(storageLevel: StorageLevel) extends Receiver[Build](storageLevel) with Logging {
def onStart() : Unit = {
new BuildStream().addListener(new BuildListener {
override def onBuildsReceived(numberOfBuilds: Int): Unit = {
}
override def onBuildRepositoryReceived(build: Build): Unit = {
store(build)
}
override def onException(e: Exception): Unit = {
reportError("Exception while streaming travis", e)
}
})
}
def onStop() : Unit = {
}
}
Принимая во внимание, что получатель использует мою пользовательскую библиотеку TRAVIS API (разработанную на Java с использованием Apache Async Client). Однако проблема заключается в следующем: данные, которые я должен получать, непрерывны и изменяются, то есть постоянно отправляются в Travis и GitHub. В качестве примера рассмотрим тот факт, что GitHub записывает в секунду ок. 350 событий - включая события push, коммит и т. П.
Но при потоковой передаче через GitHub или Travis я получаю данные из первых двух пакетов, но затем после этого RDD, кроме DStream, остаются пустыми - хотя есть данные для потоковой передачи!
Я проверил несколько вещей, включая HttpClient, используемый для пропуска запросов к API, но ни одна из них на самом деле не решила эту проблему.
Поэтому мой вопрос - что может происходить? Почему Spark не передает данные по истечении периода x. Ниже вы можете найти заданный контекст и конфигурацию:
val configuration = new SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
val ctx = new StreamingContext(configuration, Seconds(3))
val stream = GitHubUtils.createStream(ctx, StorageLevel.MEMORY_AND_DISK_SER)
// RDD IS EMPTY - that is what is happenning!
stream.window(Seconds(9)).foreachRDD(rdd => {
if (rdd.isEmpty()) {println("RDD IS EMPTY")} else {rdd.collect().foreach(event => println(event.getRepo.getName + " " + event.getId))}
})
ctx.start()
ctx.awaitTermination()
Заранее спасибо!