Spark Streaming - пользовательский приемник Travis CI и GitHub - непрерывные данные, но пустой RDD?

В последнее время, как часть научного исследования, я разрабатывал приложение, которое передает (или, по крайней мере, должно) данные из Travis CI и GitHub, используя их REST API. Цель этого состоит в том, чтобы получить представление об отношениях фиксации сборки для дальнейшего выполнения многочисленных анализов.

Для этого я реализовал следующий пользовательский приемник Travis:

object TravisUtils { 

  def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel) 

} 

private[streaming] 
class TravisInputDStream(ctx : StreamingContext, storageLevel : StorageLevel) extends ReceiverInputDStream[Build](ctx) { 

  def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel) 

} 

private[streaming] 
class TravisReceiver(storageLevel: StorageLevel) extends Receiver[Build](storageLevel) with Logging { 

  def onStart() : Unit = { 
     new BuildStream().addListener(new BuildListener { 

       override def onBuildsReceived(numberOfBuilds: Int): Unit = { 

       } 

       override def onBuildRepositoryReceived(build: Build): Unit = { 
         store(build) 
       } 

       override def onException(e: Exception): Unit = { 
         reportError("Exception while streaming travis", e) 
       } 
    }) 
  } 

  def onStop() : Unit = { 

  } 
} 

Принимая во внимание, что получатель использует мою пользовательскую библиотеку TRAVIS API (разработанную на Java с использованием Apache Async Client). Однако проблема заключается в следующем: данные, которые я должен получать, непрерывны и изменяются, то есть постоянно отправляются в Travis и GitHub. В качестве примера рассмотрим тот факт, что GitHub записывает в секунду ок. 350 событий - включая события push, коммит и т. П.

Но при потоковой передаче через GitHub или Travis я получаю данные из первых двух пакетов, но затем после этого RDD, кроме DStream, остаются пустыми - хотя есть данные для потоковой передачи!

Я проверил несколько вещей, включая HttpClient, используемый для пропуска запросов к API, но ни одна из них на самом деле не решила эту проблему.

Поэтому мой вопрос - что может происходить? Почему Spark не передает данные по истечении периода x. Ниже вы можете найти заданный контекст и конфигурацию:

val configuration = new SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]") 

val ctx = new StreamingContext(configuration, Seconds(3)) 

val stream = GitHubUtils.createStream(ctx, StorageLevel.MEMORY_AND_DISK_SER) 

// RDD IS EMPTY - that is what is happenning! 
stream.window(Seconds(9)).foreachRDD(rdd => { 
  if (rdd.isEmpty()) {println("RDD IS EMPTY")} else {rdd.collect().foreach(event => println(event.getRepo.getName + " " + event.getId))} 
}) 

ctx.start() 
ctx.awaitTermination() 

Заранее спасибо!

0 ответов

Другие вопросы по тегам