Подтверждение сообщения от RabbitMQ в Spark Streaming

Я использую https://github.com/Stratio/spark-rabbitmq, чтобы прочитать сообщение от rabbitMQ. Я не смог найти способ подтвердить сообщения после того, как обработка сообщения выполнена в искре.

Ниже приведен фрагмент кода:

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2",
      "queueName" -> config.getString("application.rabbitmq.queue"),
      "exchange" -> config.getString("application.rabbitmq.exchange"),
      "host" -> config.getString("application.rabbitmq.host"),
      "port" -> config.getString("application.rabbitmq.port"),
      "routingKeys" -> config.getString("application.rabbitmq.queue") )


 /*val distributeKey = Seq(
   RabbitMQDistributedKey(
     config.getString("application.rabbitmq.queue"),
     new ExchangeAndRouting(config.getString("application.rabbitmq.exchange"), "query.analyzer"),
     rabbitParams
   )
  )*/



  val receiverStream: InputDStream[String] = RabbitMQUtils.createStream(
    ssc,
    rabbitParams)


  receiverStream.start()

  receiverStream.foreachRDD(
    rdd => {

      if(!rdd.isEmpty()){
         // Processing 
       }
      }

0 ответов

Другие вопросы по тегам