Akka Camel: сообщения перенаправлены на неверный маршрут SEDA
Мой упрощенный Akka Camel
Приложение настроено следующим образом:
AppleProducer -> seda:appleRoute -> AppleConsumer
OrangeProducer -> seda:orangeRoute -> OrangeConsumer
Я вижу только то, что Apple
события периодически потребляются OrangeConsumer
, и наоборот.
Выполнение этого примера (возможно, несколько раз) ниже воссоздает его.
Я не понимаю, как это происходит только периодически. Что я делаю неправильно?
object TestApp extends App {
implicit val system = ActorSystem()
val camel = CamelExtension(system)
val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
system.actorOf(Props(classOf[MyAppleConsumer], "seda:appleRoute"), "AppleConsumer")
val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
system.actorOf(Props(classOf[MyOrangeConsumer], "seda:orangeRoute"), "OrangeConsumer")
appleProducer ! new Apple("1")
orangeProducer ! new Orange("1")
appleProducer ! new Apple("2")
orangeProducer ! new Orange("2")
appleProducer ! new Apple("3")
orangeProducer ! new Orange("3")
appleProducer ! new Apple("4")
orangeProducer ! new Orange("4")
appleProducer ! new Apple("5")
orangeProducer ! new Orange("5")
appleProducer ! new Apple("6")
orangeProducer ! new Orange("6")
}
class MyProducer(route: String) extends Actor with ActorLogging {
def receive = {
case payload: Any =>
val template = CamelExtension(context.system).template
template.setDefaultEndpointUri(route)
template.sendBody(payload)
}
}
class MyAppleConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case event: CamelMessage if event.body.isInstanceOf[Apple] =>
log.info("Received event {}", event.body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}
class MyOrangeConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case event: CamelMessage if event.body.isInstanceOf[Orange] =>
log.info("Received event {}", event.body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}
class Apple(id: String)
class Orange(id: String)
2 ответа
Я думаю, что мне удалось выяснить это в конце концов.
Проблема не имеет ничего общего с SEDA. Вместо этого кажется, что то же самое DefaultProducerTemplate
возвращается для нескольких MyProducer
экземпляров.
Поэтому при настройке defaultEndpointUri
Решением для меня было создать только один экземпляр MyProducer
актер, чтобы убедиться, что мы не сталкиваемся с этим условием гонки
Я бы порекомендовал расширить черту Producer
вместо использования шаблона для вашего MyProducer
так же, как вы используете Consumer
для тебя MyAppleConsumer
а также MyOrangeConsumer
,
class MyProducer(route: String) extends Producer with OneWay {
def endpointUri = route
}
Более подробную информацию можно найти здесь: http://doc.akka.io/docs/akka/snapshot/scala/camel.html
Я считаю, что вы должны иметь возможность упростить свой код следующим образом (заявление об отказе: не скомпилировано и не протестировано!):
case class Apple(id: String)
case class Orange(id: String)
object TestApp extends App {
implicit val system = ActorSystem()
val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
system.actorOf(Props(classOf[MyConsumer], "seda:appleRoute"), "AppleConsumer")
val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
system.actorOf(Props(classOf[MyConsumer], "seda:orangeRoute"), "OrangeConsumer")
appleProducer ! Apple("1")
orangeProducer ! Orange("1")
appleProducer ! Apple("2")
orangeProducer ! Orange("2")
appleProducer ! Apple("3")
orangeProducer ! Orange("3")
appleProducer ! Apple("4")
orangeProducer ! Orange("4")
appleProducer ! Apple("5")
orangeProducer ! Orange("5")
appleProducer ! Apple("6")
orangeProducer ! Orange("6")
}
class MyProducer(route: String) extends Producer with OneWay with ActorLogging {
def endpointUri = route
}
class MyConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case CamelMessage(body : Apple, headers) =>
log.info("Received event {}", body)
case CamelMessage(body : Orange, headers) =>
log.info("Received event {}", body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}