Camel, Akka, JMS и отложенное подтверждение сообщения: могу ли я подтвердить ранее обработанное сообщение брокеру?
Я использую Akka (последняя стабильная версия), akka-camel
и JMS (для целей этого разговора, скажем, это ActiveMQ, но в идеале решение должно быть общим).
Вариант использования
У меня есть следующий вариант использования. В очереди Q
Я получаю сообщения, подобные этому:
time: 1 2 3 4 5 6
| A1 | B1 | C1 | C2 | A2 | B2 | ....
^ ^
first latest
Моя конечная цель - соединить их вместе в (A1,A2)
, (B1,B2)
, и так далее; Несмотря на такие сложности, как дубликаты и недоставленные сообщения, сложность заключается в том, что я должен обеспечить, чтобы до тех пор, пока вся пара не была сопоставлена и обработана, брокер сохранял все неподтвержденные сообщения.
пример
В 4
Я получил и обработал 4 сообщения, и обработал успешно пару (C1, C2)
Однако я до сих пор не могу подтвердить что-либо обратно брокеру, потому что A1
а также B1
все еще не имеют аналогов и ожидают, и в JMS, чтобы подтвердить C2
означает подтвердить все сообщения доC2
, На самом деле, первое подтверждение, которое я могу отправить, пришло вовремя 5
, когда A2
получено: в этот момент я могу подтвердить A1
(и только A1
, как B1
все еще в ожидании).
Эта проблема
Теперь я не могу понять, как сделать такое задержанное и асинхронное подтверждение через akka-camel
, Я читал в Интернете, и, хотя я могу найти объяснения о том, как подтверждать сообщения вручную ( документы и пример), нет ничего, показывающего, как подтвердить ранее обработанное сообщение брокеру.
import akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure
class Consumer3 extends Consumer {
override def autoAck = false
def endpointUri = "jms:queue:test"
def receive = {
case msg: CamelMessage =>
sender() ! Ack
// on success
// ..
val someException = new Exception("e1")
// on failure
sender() ! Failure(someException)
}
}
В этом случае, Ack
является object
и его семантика действительно проста: я подтверждаю текущее сообщение, в то время как мне нужно что-то вроде сообщения X, которое теперь подтверждается, где X - какое-то предыдущее сообщение, но не обязательно текущее.
Этот вариант использования поддерживается или поддерживается через akka-camel
или я должен просто построить это сам?
Спасибо
1 ответ
Похоже, что вы хотите ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE ActiveMQ.
К сожалению, INVIDUAL_ACKNOWLEDGE не является частью спецификации JMS, но, согласно этому посту, он широко применяется для каждой очереди. (Я на самом деле не проверял)
Так как это не по спецификации, akka не поддерживает его "из коробки", но, несомненно, внесет хороший вклад!