Как отправить в реальном времени количество активных подключений WebSocket через HTTP WebSocket Akka? (работает только с Akka Streams)
Как можно посчитать в реальном времени количество активных подключений WebSocket через Akka WebSocket? Кажется, что Akka HTTP WebSockets изменяет ход предварительной подготовки Akka Stream.
В первом блоке кода, приведенном ниже, счетчик отправляется только тогда, когда это единственное соединение. Любые клиенты, которые подключаются, когда число уже превышает 1, не получают обновления до тех пор, пока не подключится следующий клиент.
Во втором блоке кода удален код WebSocket, поэтому используются только потоки Akka, и каждая подписка на поток получает немедленный счет, как показано в stdout.
Использование Akka HTTP WebSocket
Запуск этого кода и доступ к http://localhost:8080/ с несколькими окнами браузера показывает это в их браузерных консолях.
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContextExecutor
object CounterFlow {
private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTest")
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
val (counterSourceQueue, counterSource) =
Source.queue[Int](0, OverflowStrategy.backpressure)
.conflate(_ + _)
.scan(0)(_ + _)
.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both)
.run()
val clientFlow: Source[TextMessage.Strict, Unit] =
counterSource
.map(_.toString)
.map(TextMessage.Strict)
.prepend(Source.fromIterator(() => {
counterSourceQueue.offer(1)
Iterator.empty
}))
.watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))
def main(args: Array[String]): Unit = {
val route: Route =
pathEndOrSingleSlash {
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
"<h1>Check browser console for connection count</h1>" +
"<script>const ws = new WebSocket(\"ws://localhost:8080/ws\");ws.onmessage = e => console.log(e.data);</script>"))
}
} ~
path("ws") {
handleWebSocketMessages(
Flow.fromSinkAndSourceCoupled(
Sink.ignore,
clientFlow))
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
() // discard non-Unit value
}
}
Только Akka Streams
Как видно на stdout, каждый клиент получает немедленный счет по желанию.
import akka.actor.ActorSystem
import akka.stream.scaladsl.{BroadcastHub, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContext.Implicits.global
object CounterFlow {
private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTEst")
private implicit val materializer: ActorMaterializer = ActorMaterializer()
val (counterSourceQueue, counterSource) = Source.queue[Int](0, OverflowStrategy.backpressure)
.conflate(_ + _)
.scan(0)(_ + _)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
def subscribeClient(clientName: String): Unit =
counterSource
.prepend(Source.fromIterator(() => {
counterSourceQueue.offer(1)
Iterator.empty
}))
.watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))
.runWith(Sink.foreach(msg => println(s"$clientName $msg")))
.foreach(_ => println(s"$clientName Done"))
def main(args: Array[String]): Unit = {
subscribeClient("A")
subscribeClient("B")
Thread.sleep(500L)
subscribeClient("C")
}
}
1 ответ
Путем изменения BroadcastHub.sink(bufferSize = 1)
в BroadcastHub.sink
(как в версии Akka Streams) каждый клиент получает счет немедленно.
Вот версия, с которой я тестировал:
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContextExecutor
object CounterFlow {
private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTest")
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
val (counterSourceQueue, counterSource) =
Source.queue[Int](0, OverflowStrategy.backpressure)
.conflate(_ + _)
.scan(0)(_ + _)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
val clientFlow: Source[TextMessage.Strict, Unit] =
counterSource
.map(_.toString)
.map(TextMessage.Strict)
.prepend(Source.fromIterator(() => {
counterSourceQueue.offer(1)
Iterator.empty
}))
.watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))
def main(args: Array[String]): Unit = {
val route: Route =
pathEndOrSingleSlash {
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
"<p>Connections: <span id='c'>?</span></p>" +
"<script>const ws = new WebSocket(\"ws://localhost:8080/ws\");ws.onmessage = e => { console.log(e.data); document.getElementById('c').innerText = e.data; };</script>"))
}
} ~
path("ws") {
handleWebSocketMessages(
Flow.fromSinkAndSourceCoupled(
Sink.ignore,
clientFlow))
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
() // discard non-Unit value
}
}
PS: очень хороший пример!