Как отправить в реальном времени количество активных подключений WebSocket через HTTP WebSocket Akka? (работает только с Akka Streams)

Как можно посчитать в реальном времени количество активных подключений WebSocket через Akka WebSocket? Кажется, что Akka HTTP WebSockets изменяет ход предварительной подготовки Akka Stream.

В первом блоке кода, приведенном ниже, счетчик отправляется только тогда, когда это единственное соединение. Любые клиенты, которые подключаются, когда число уже превышает 1, не получают обновления до тех пор, пока не подключится следующий клиент.

Во втором блоке кода удален код WebSocket, поэтому используются только потоки Akka, и каждая подписка на поток получает немедленный счет, как показано в stdout.

Использование Akka HTTP WebSocket

Запуск этого кода и доступ к http://localhost:8080/ с несколькими окнами браузера показывает это в их браузерных консолях.

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.ExecutionContextExecutor

object CounterFlow {

    private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTest")
    private implicit val materializer: ActorMaterializer = ActorMaterializer()
    private implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher

    val (counterSourceQueue, counterSource) =
        Source.queue[Int](0, OverflowStrategy.backpressure)
            .conflate(_ + _)
            .scan(0)(_ + _)
            .toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both)
            .run()

    val clientFlow: Source[TextMessage.Strict, Unit] =
        counterSource
            .map(_.toString)
            .map(TextMessage.Strict)
            .prepend(Source.fromIterator(() => {
                counterSourceQueue.offer(1)
                Iterator.empty
            }))
            .watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))

    def main(args: Array[String]): Unit = {

        val route: Route =
            pathEndOrSingleSlash {
                get {
                    complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
                        "<h1>Check browser console for connection count</h1>" +
                            "<script>const ws = new WebSocket(\"ws://localhost:8080/ws\");ws.onmessage = e => console.log(e.data);</script>"))
                }
            } ~
                path("ws") {
                    handleWebSocketMessages(
                        Flow.fromSinkAndSourceCoupled(
                            Sink.ignore,
                            clientFlow))
                }

        Http().bindAndHandle(route, "0.0.0.0", 8080)
        () // discard non-Unit value
    }
}

Только Akka Streams

Как видно на stdout, каждый клиент получает немедленный счет по желанию.

import akka.actor.ActorSystem
import akka.stream.scaladsl.{BroadcastHub, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.ExecutionContext.Implicits.global

object CounterFlow {

    private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTEst")
    private implicit val materializer: ActorMaterializer = ActorMaterializer()

    val (counterSourceQueue, counterSource) = Source.queue[Int](0, OverflowStrategy.backpressure)
        .conflate(_ + _)
        .scan(0)(_ + _)
        .toMat(BroadcastHub.sink)(Keep.both)
        .run()

    def subscribeClient(clientName: String): Unit =
        counterSource
            .prepend(Source.fromIterator(() => {
                counterSourceQueue.offer(1)
                Iterator.empty
            }))
            .watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))
            .runWith(Sink.foreach(msg => println(s"$clientName $msg")))
            .foreach(_ => println(s"$clientName Done"))

    def main(args: Array[String]): Unit = {
        subscribeClient("A")
        subscribeClient("B")
        Thread.sleep(500L)
        subscribeClient("C")
    }
}

1 ответ

Путем изменения BroadcastHub.sink(bufferSize = 1) в BroadcastHub.sink (как в версии Akka Streams) каждый клиент получает счет немедленно.

Вот версия, с которой я тестировал:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.ExecutionContextExecutor

object CounterFlow {

    private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTest")
    private implicit val materializer: ActorMaterializer = ActorMaterializer()
    private implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher

    val (counterSourceQueue, counterSource) =
        Source.queue[Int](0, OverflowStrategy.backpressure)
            .conflate(_ + _)
            .scan(0)(_ + _)
            .toMat(BroadcastHub.sink)(Keep.both)
            .run()

    val clientFlow: Source[TextMessage.Strict, Unit] =
        counterSource
            .map(_.toString)
            .map(TextMessage.Strict)
            .prepend(Source.fromIterator(() => {
                counterSourceQueue.offer(1)
                Iterator.empty
            }))
            .watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))

    def main(args: Array[String]): Unit = {

        val route: Route =
            pathEndOrSingleSlash {
                get {
                    complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
                        "<p>Connections: <span id='c'>?</span></p>" +
                            "<script>const ws = new WebSocket(\"ws://localhost:8080/ws\");ws.onmessage = e => { console.log(e.data); document.getElementById('c').innerText = e.data; };</script>"))
                }
            } ~
                path("ws") {
                    handleWebSocketMessages(
                        Flow.fromSinkAndSourceCoupled(
                            Sink.ignore,
                            clientFlow))
                }

        Http().bindAndHandle(route, "0.0.0.0", 8080)
        () // discard non-Unit value
    }
}

PS: очень хороший пример!

Другие вопросы по тегам