Завершить запрос с последним товаром в потоке

Я хотел бы завершить GET запрос с последним доступным товаром в потоке. Этот поток, в частности, объединяет события, созданные актером и уже индивидуально используемые WebSocket.

Допустим, событие можно представить следующим образом:

final case class Event(id: String, value: Double)

Первое, что я делаю, это создание SourceQueue где субъект будет отправлять события и концентратор, чтобы разные клиенты могли получать эти события независимо:

val (queue, hub) =
  Source.queue[Event](256, OverflowStrategy.dropHead).
    toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both).run()

Затем я могу создать актера, который может выдвигать события на queue и передать hub на сервис, который обслуживает события через WebSocket:

extractUpgradeToWebSocket { upgrade =>
  complete(upgrade.handleMessagesWithSinkSource(
    inSink = Sink.ignore,
    outSource =
      hub.map(in => TextMessage(fmt.write(in).toString()))
  ))
}

Это отлично работает, в том числе с несколькими потребителями одновременно.

То, что я хотел бы сделать дальше, это иметь службу, которая потребляет события из hub и создает список последних событий для каждого идентификатора, обслуживая его через GET конечная точка.

Я попробовал несколько подходов, чтобы решить этот. Два подхода, которые я попробовал, были:

  • запустить поток, который обновляет приватную переменную
  • в комплекте с раковиной, которая возвращает last элемент

Запустите поток, который обновляет приватную переменную

На самом деле это последний подход, который я попробовал. Странная (или это?) Мысль, которую я заметил, заключается в том, что на самом деле ничего не регистрируется (не должно пройти через log комбинатор будет зарегестрирован?).

Результатом использования этого подхода является то, что latest всегда null и ответ, таким образом, всегда пуст.

final class Service(hub: Source[Event, NotUsed])(implicit s: ActorSystem, m: ActorMaterializer, t: Timeout) extends Directives with JsonSupport {

  implicit private val executor = system.dispatcher

  @volatile private[this] var latest: List[Event] = _

  hub.
    log("hub", identity).
    groupBy(Int.MaxValue, { case Event(id, _) => id }).
    map { case event @ Event(id, _) => Map(id -> event) }.
    reduce(_ ++ _).
    mergeSubstreams.
    map(_.values.toList).
    toMat(Sink.foreach(latest = _))(Keep.none).run()

  val definition = get { complete(Option(latest)) }

}

Я также попробовал аналогичный подход, который использует "блочного" субъекта и передает ему агрегаты, но эффект тот же.

В комплекте с раковиной, которая возвращает last элемент

Это первый подход, который я попытался применить. В результате ответ зависает до истечения времени ожидания, и Akka HTTP возвращает 500 в браузер.

final class Service(hub: Source[Event, NotUsed])(implicit s: ActorSystem, m: ActorMaterializer, t: Timeout) extends Directives with JsonSupport {

  implicit private val executor = system.dispatcher
  private[this] val currentLocations =
    hub.
      groupBy(Int.MaxValue, { case Event(id, _) => id }).
      map { case event @ Event(id, _) => Map(id -> event) }.
      reduce(_ ++ _).
      mergeSubstreams.
      map(_.values.toList).
      runWith(Sink.reduce((_, next) => next))

  val definition = get { complete(currentLocations) }

}

1 ответ

ActorRefКак раковина

Вы можете создать Actor это работает Map из id в Event:

import scala.collection.immutable

object QueryMap

class MapKeeperActor() extends Actor {

  var internalMap = immutable.Map.empty[String, Event]

  override def receive = {
    case e : Event    => internalMap = internalMap + (e.id -> e)
    case _ : QueryMap => sender ! internalMap
  }
}

Эта ссылка может быть использована в Sink который будет прикреплен к BroadcastHub:

object OnCompleteMessage

val system : ActorSystem = ???

val mapKeeperRef = system.actorOf(Props[MapKeeperActor])

val mapKeeperSink : Sink[Event, _] = Sink.actorRef[Event](mapKeeperRef, OnCompleteMessage)

Query Actor in Route

Теперь мы можем создать Route который запросит хранителя карты с помощью директив. Тем не менее, вам придется решить, как сериализовать Map в ResponseEntity для HttpResponse:

val serializeMap : Map[String, Event] => ResponseEntity = ???

val route = 
  get {
    onComplete( (mapKeeperRef ? QueryMap).mapTo[Map[String, Event]]) {
      case Success(map) => complete(HttpResponse(entity=serializeMap(map))
      case Failure(ex)  => complete((InternalServerError, s"An error occurred: ${ex.getMessage}"))
    }
  }
Другие вопросы по тегам