Подтверждение сообщения от RabbitMQ в Spark Streaming
Я использую https://github.com/Stratio/spark-rabbitmq, чтобы прочитать сообщение от rabbitMQ. Я не смог найти способ подтвердить сообщения после того, как обработка сообщения выполнена в искре.
Ниже приведен фрагмент кода:
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2",
"queueName" -> config.getString("application.rabbitmq.queue"),
"exchange" -> config.getString("application.rabbitmq.exchange"),
"host" -> config.getString("application.rabbitmq.host"),
"port" -> config.getString("application.rabbitmq.port"),
"routingKeys" -> config.getString("application.rabbitmq.queue") )
/*val distributeKey = Seq(
RabbitMQDistributedKey(
config.getString("application.rabbitmq.queue"),
new ExchangeAndRouting(config.getString("application.rabbitmq.exchange"), "query.analyzer"),
rabbitParams
)
)*/
val receiverStream: InputDStream[String] = RabbitMQUtils.createStream(
ssc,
rabbitParams)
receiverStream.start()
receiverStream.foreachRDD(
rdd => {
if(!rdd.isEmpty()){
// Processing
}
}