Akka Camel: сообщения перенаправлены на неверный маршрут SEDA

Мой упрощенный Akka Camel Приложение настроено следующим образом:

AppleProducer -> seda:appleRoute -> AppleConsumer

OrangeProducer -> seda:orangeRoute -> OrangeConsumer

Я вижу только то, что Apple события периодически потребляются OrangeConsumer, и наоборот.

Выполнение этого примера (возможно, несколько раз) ниже воссоздает его.

Я не понимаю, как это происходит только периодически. Что я делаю неправильно?

object TestApp extends App {
  implicit val system = ActorSystem()
  val camel = CamelExtension(system)
  val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
  system.actorOf(Props(classOf[MyAppleConsumer], "seda:appleRoute"), "AppleConsumer")
  val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
  system.actorOf(Props(classOf[MyOrangeConsumer], "seda:orangeRoute"), "OrangeConsumer")

  appleProducer ! new Apple("1")
  orangeProducer ! new Orange("1")
  appleProducer ! new Apple("2")
  orangeProducer ! new Orange("2")
  appleProducer ! new Apple("3")
  orangeProducer ! new Orange("3")
  appleProducer ! new Apple("4")
  orangeProducer ! new Orange("4")
  appleProducer ! new Apple("5")
  orangeProducer ! new Orange("5")
  appleProducer ! new Apple("6")
  orangeProducer ! new Orange("6")

}

class MyProducer(route: String) extends Actor with ActorLogging  {

  def receive = {
    case payload: Any =>
      val template = CamelExtension(context.system).template
      template.setDefaultEndpointUri(route)
      template.sendBody(payload)
  }
}

class MyAppleConsumer(route: String) extends Consumer with ActorLogging {
  override def endpointUri: String = route

  override def receive: Receive = {
    case event: CamelMessage if event.body.isInstanceOf[Apple] =>
      log.info("Received event {}", event.body)
    case _ => throw new IllegalArgumentException("Invalid entity")
  }
}

class MyOrangeConsumer(route: String) extends Consumer with ActorLogging {
  override def endpointUri: String = route

  override def receive: Receive = {
    case event: CamelMessage if event.body.isInstanceOf[Orange] =>
      log.info("Received event {}", event.body)
    case _ => throw new IllegalArgumentException("Invalid entity")
  }
}

class Apple(id: String)
class Orange(id: String)

2 ответа

Решение

Я думаю, что мне удалось выяснить это в конце концов.

Проблема не имеет ничего общего с SEDA. Вместо этого кажется, что то же самое DefaultProducerTemplate возвращается для нескольких MyProducer экземпляров.

Поэтому при настройке defaultEndpointUri

Решением для меня было создать только один экземпляр MyProducer актер, чтобы убедиться, что мы не сталкиваемся с этим условием гонки

Я бы порекомендовал расширить черту Producer вместо использования шаблона для вашего MyProducer так же, как вы используете Consumer для тебя MyAppleConsumer а также MyOrangeConsumer,

class MyProducer(route: String) extends Producer with OneWay  {
  def endpointUri = route 
}

Более подробную информацию можно найти здесь: http://doc.akka.io/docs/akka/snapshot/scala/camel.html

Я считаю, что вы должны иметь возможность упростить свой код следующим образом (заявление об отказе: не скомпилировано и не протестировано!):

case class Apple(id: String)
case class Orange(id: String)

object TestApp extends App {
  implicit val system = ActorSystem()

  val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
  system.actorOf(Props(classOf[MyConsumer], "seda:appleRoute"), "AppleConsumer")
  val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
  system.actorOf(Props(classOf[MyConsumer], "seda:orangeRoute"), "OrangeConsumer")

  appleProducer ! Apple("1")
  orangeProducer ! Orange("1")
  appleProducer ! Apple("2")
  orangeProducer ! Orange("2")
  appleProducer ! Apple("3")
  orangeProducer ! Orange("3")
  appleProducer ! Apple("4")
  orangeProducer ! Orange("4")
  appleProducer ! Apple("5")
  orangeProducer ! Orange("5")
  appleProducer ! Apple("6")
  orangeProducer ! Orange("6")

}

class MyProducer(route: String) extends Producer with OneWay with ActorLogging  {
  def endpointUri = route
}

class MyConsumer(route: String) extends Consumer with ActorLogging {
  override def endpointUri: String = route

  override def receive: Receive = {
    case CamelMessage(body : Apple, headers) =>
      log.info("Received event {}", body)
    case CamelMessage(body : Orange, headers) =>
      log.info("Received event {}", body)
    case _ => throw new IllegalArgumentException("Invalid entity")
  }
}
Другие вопросы по тегам