Администратор не выполняет дедупликацию при наличии нескольких «подписчиков» на одном узле в распределенной теме pubsub в akka typed
Изучая подписку на распределенную публикацию в новом API akka.actor.typed, мы видим, что подписка на распределенную публикацию достигается путем представления каждой подтемы публикации с помощью актера akka.actor.typed.pubsub.Topic . Чтобы смоделировать то же самое, мы создали локальный кластер akka с 3 исходными узлами, один из которых выступает в роли издателя, а два других — потребителями. При потреблении мы видим, что оба подписчика получают одни и те же данные, т.е. дедупликация не происходит . Пожалуйста, найдите пример кода для Publisher:
object PubApp {
def pubsubImplementor() = {
val pubsubGurdian = Behaviors.setup[Unit] { (context) =>
val publisher = context.spawn(PublisherEnd(), "publisher")
Thread.sleep(20000) //Waiting for subscriber to go up
(1 to 20).map { i =>
Thread.sleep(1000)
publisher ! Data(i)
}
Behaviors.same
}
implicit val config = ConfigFactory.parseString(
s"""
|akka.remote.artery.canonical.port = 2551
|""".stripMargin
).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
ActorSystem(pubsubGurdian, "SamikCluster", config)
}
def main(args: Array[String]): Unit = {
pubsubImplementor()
}
}
object PublisherEnd {
case class Data(i : Int) extends Command
def apply():Behavior[Command] = Behaviors.setup{(context) =>
val topic = context.spawn(Topic[Command]("pub-sub"), "publisherActor")
Behaviors.receive{(context,message) =>
message match {
case m@Data(i) =>
context.log.info(s"message: ${m.i} sent to Sink")
topic ! Topic.Publish(m)
Behaviors.same
}
}
}
}
Пожалуйста, найдите пример кода для подписчиков/потребителей:
object SubApp1 {
def subImplementer() = {
val subGurdian = Behaviors.setup[Unit] { (context) =>
val subscriber1 = context.spawn(SubsriberEnd(), "subscriber1")
Behaviors.same
}
implicit val config = ConfigFactory.parseString(
s"""
|akka.remote.artery.canonical.port = 2552
|""".stripMargin
).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
ActorSystem(subGurdian, "SamikCluster", config)
}
def main(args: Array[String]): Unit = {
subImplementer()
}
}
object SubApp2 {
def subImplementer() = {
val subGurdian = Behaviors.setup[Unit] { (context) =>
val subscriber2 = context.spawn(SubsriberEnd(), "subscriber2")
Behaviors.same
}
implicit val config = ConfigFactory.parseString(
s"""
|akka.remote.artery.canonical.port = 2553
|""".stripMargin
).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
ActorSystem(subGurdian, "SamikCluster", config)
}
def main(args: Array[String]): Unit = {
subImplementer()
}
}
object SubsriberEnd {
def apply(): Behavior[Command] = Behaviors.setup[Command] { (context) =>
val topic = context.spawn(Topic[Command]("pub-sub"), "subscriberActor")
topic ! Topic.Subscribe(context.self)
Behaviors.receive { (context, message) =>
message match {
case m@Data(i) =>
context.log.info(s"[${context.self.path}] received message: ${m.i} in Sink")
Behaviors.same
}
}
}
}
Вывод, который я получаю от издателя:
09:40:07.566 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 1 sent to Sink
09:40:08.567 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 2 sent to Sink
09:40:09.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 3 sent to Sink
09:40:10.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 4 sent to Sink
09:40:11.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 5 sent to Sink
09:40:12.576 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 6 sent to Sink
09:40:13.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 7 sent to Sink
09:40:14.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 8 sent to Sink
09:40:15.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 9 sent to Sink
09:40:16.582 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 10 sent to Sink
09:40:17.583 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 11 sent to Sink
09:40:18.585 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 12 sent to Sink
09:40:19.586 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 13 sent to Sink
09:40:20.590 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 14 sent to Sink
09:40:21.591 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 15 sent to Sink
09:40:22.591 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 16 sent to Sink
09:40:23.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 17 sent to Sink
09:40:24.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 18 sent to Sink
09:40:25.599 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 19 sent to Sink
09:40:26.602 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 20 sent to Sink
Ниже приведены выходные данные потребителей: Consumer1, т.е. SubApp1.
09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 9 in Sink
09:40:16.583 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 15 in Sink
09:40:22.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 17 in Sink
09:40:24.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 20 in Sink
Consumer2 т.е. SubApp2
09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 9 in Sink
09:40:16.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 15 in Sink
09:40:22.594 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 17 in Sink
09:40:24.599 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 20 in Sink
Одно и то же сообщение публикуется у обоих подписчиков (действуя как широковещательная рассылка), чего мы не хотим. Подскажите, пожалуйста, как добиться дедупликации, чтобы одно и то же сообщение не публиковалось дважды у разных подписчиков.
1 ответ
Distributed Pub Sub предназначен для варианта использования «один ко многим»: он попытается доставить сообщения, опубликованные в теме, каждому подписчику этой темы во время публикации (приблизительно). Он не предназначен для дедупликации.
Самое близкое, что вы можете получить с помощью Distributed Pub Sub, — это логическая тема, состоящая из нескольких подтем в дополнение к основной теме. Издатель публикует в основной теме, а действующее лицо (вероятно, кластерный синглтон) подписывается на эту тему и повторно публикует каждое сообщение в одной из подтем (в качестве альтернативы производитель может создавать в подтеме).
Обратите внимание, что Distributed Pub Sub не дает никакой полезной гарантии того, как часто будет обрабатываться сообщение, отправленное через него: это максимум один раз для каждого подписчика, но каждое сообщение может быть доставлено и обработано произвольно большим количеством подписчиков, поэтому гарантия наименьшего нулевого времени.