Администратор не выполняет дедупликацию при наличии нескольких «подписчиков» на одном узле в распределенной теме pubsub в akka typed

Изучая подписку на распределенную публикацию в новом API akka.actor.typed, мы видим, что подписка на распределенную публикацию достигается путем представления каждой подтемы публикации с помощью актера akka.actor.typed.pubsub.Topic . Чтобы смоделировать то же самое, мы создали локальный кластер akka с 3 исходными узлами, один из которых выступает в роли издателя, а два других — потребителями. При потреблении мы видим, что оба подписчика получают одни и те же данные, т.е. дедупликация не происходит . Пожалуйста, найдите пример кода для Publisher:

      object PubApp {

  def pubsubImplementor() = {

    val pubsubGurdian = Behaviors.setup[Unit] { (context) =>
      val publisher = context.spawn(PublisherEnd(), "publisher")

      Thread.sleep(20000) //Waiting for subscriber to go up
      (1 to 20).map { i =>
        Thread.sleep(1000)
        publisher ! Data(i)
      }
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2551
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(pubsubGurdian, "SamikCluster", config)
  }


  def main(args: Array[String]): Unit = {
    pubsubImplementor()
  }

}
      object PublisherEnd {

  case class Data(i : Int) extends Command
  
  def apply():Behavior[Command] = Behaviors.setup{(context) =>
    
    val topic = context.spawn(Topic[Command]("pub-sub"), "publisherActor")

    Behaviors.receive{(context,message) =>
      message match {
       
        case m@Data(i) =>
          context.log.info(s"message: ${m.i} sent to Sink")
          topic ! Topic.Publish(m)
          Behaviors.same
       
      }

    }
  }
}

Пожалуйста, найдите пример кода для подписчиков/потребителей:

      object SubApp1 {

  def subImplementer() = {
    val subGurdian = Behaviors.setup[Unit] { (context) =>

      val subscriber1 = context.spawn(SubsriberEnd(), "subscriber1")
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2552
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(subGurdian, "SamikCluster", config)
  }

  def main(args: Array[String]): Unit = {
    subImplementer()
  }
}

object SubApp2 {

  def subImplementer() = {
    val subGurdian = Behaviors.setup[Unit] { (context) =>

      val subscriber2 = context.spawn(SubsriberEnd(), "subscriber2")
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2553
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(subGurdian, "SamikCluster", config)
  }

  def main(args: Array[String]): Unit = {
    subImplementer()
  }
}
      object SubsriberEnd {


  def apply(): Behavior[Command] = Behaviors.setup[Command] { (context) =>

    val topic = context.spawn(Topic[Command]("pub-sub"), "subscriberActor")
    topic ! Topic.Subscribe(context.self)


    Behaviors.receive { (context, message) =>
      message match {
        case m@Data(i) =>
          context.log.info(s"[${context.self.path}] received message: ${m.i} in Sink")
          Behaviors.same
      }

    }
  }

}

Вывод, который я получаю от издателя:

      09:40:07.566 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 1 sent to Sink
09:40:08.567 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 2 sent to Sink
09:40:09.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 3 sent to Sink
09:40:10.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 4 sent to Sink
09:40:11.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 5 sent to Sink
09:40:12.576 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 6 sent to Sink
09:40:13.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 7 sent to Sink
09:40:14.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 8 sent to Sink
09:40:15.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 9 sent to Sink
09:40:16.582 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 10 sent to Sink
09:40:17.583 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 11 sent to Sink
09:40:18.585 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 12 sent to Sink
09:40:19.586 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 13 sent to Sink
09:40:20.590 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 14 sent to Sink
09:40:21.591 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 15 sent to Sink
09:40:22.591 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 16 sent to Sink
09:40:23.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 17 sent to Sink
09:40:24.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 18 sent to Sink
09:40:25.599 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 19 sent to Sink
09:40:26.602 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 20 sent to Sink

Ниже приведены выходные данные потребителей: Consumer1, т.е. SubApp1.

      09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 9 in Sink
09:40:16.583 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 15 in Sink
09:40:22.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 17 in Sink
09:40:24.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 20 in Sink

Consumer2 т.е. SubApp2

      09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 9 in Sink
09:40:16.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 15 in Sink
09:40:22.594 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 17 in Sink
09:40:24.599 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 20 in Sink

Одно и то же сообщение публикуется у обоих подписчиков (действуя как широковещательная рассылка), чего мы не хотим. Подскажите, пожалуйста, как добиться дедупликации, чтобы одно и то же сообщение не публиковалось дважды у разных подписчиков.

1 ответ

Distributed Pub Sub предназначен для варианта использования «один ко многим»: он попытается доставить сообщения, опубликованные в теме, каждому подписчику этой темы во время публикации (приблизительно). Он не предназначен для дедупликации.

Самое близкое, что вы можете получить с помощью Distributed Pub Sub, — это логическая тема, состоящая из нескольких подтем в дополнение к основной теме. Издатель публикует в основной теме, а действующее лицо (вероятно, кластерный синглтон) подписывается на эту тему и повторно публикует каждое сообщение в одной из подтем (в качестве альтернативы производитель может создавать в подтеме).

Обратите внимание, что Distributed Pub Sub не дает никакой полезной гарантии того, как часто будет обрабатываться сообщение, отправленное через него: это максимум один раз для каждого подписчика, но каждое сообщение может быть доставлено и обработано произвольно большим количеством подписчиков, поэтому гарантия наименьшего нулевого времени.

Другие вопросы по тегам