Завершить запрос с последним товаром в потоке
Я хотел бы завершить GET
запрос с последним доступным товаром в потоке. Этот поток, в частности, объединяет события, созданные актером и уже индивидуально используемые WebSocket.
Допустим, событие можно представить следующим образом:
final case class Event(id: String, value: Double)
Первое, что я делаю, это создание SourceQueue
где субъект будет отправлять события и концентратор, чтобы разные клиенты могли получать эти события независимо:
val (queue, hub) =
Source.queue[Event](256, OverflowStrategy.dropHead).
toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both).run()
Затем я могу создать актера, который может выдвигать события на queue
и передать hub
на сервис, который обслуживает события через WebSocket:
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(
inSink = Sink.ignore,
outSource =
hub.map(in => TextMessage(fmt.write(in).toString()))
))
}
Это отлично работает, в том числе с несколькими потребителями одновременно.
То, что я хотел бы сделать дальше, это иметь службу, которая потребляет события из hub
и создает список последних событий для каждого идентификатора, обслуживая его через GET
конечная точка.
Я попробовал несколько подходов, чтобы решить этот. Два подхода, которые я попробовал, были:
- запустить поток, который обновляет приватную переменную
- в комплекте с раковиной, которая возвращает
last
элемент
Запустите поток, который обновляет приватную переменную
На самом деле это последний подход, который я попробовал. Странная (или это?) Мысль, которую я заметил, заключается в том, что на самом деле ничего не регистрируется (не должно пройти через log
комбинатор будет зарегестрирован?).
Результатом использования этого подхода является то, что latest
всегда null
и ответ, таким образом, всегда пуст.
final class Service(hub: Source[Event, NotUsed])(implicit s: ActorSystem, m: ActorMaterializer, t: Timeout) extends Directives with JsonSupport {
implicit private val executor = system.dispatcher
@volatile private[this] var latest: List[Event] = _
hub.
log("hub", identity).
groupBy(Int.MaxValue, { case Event(id, _) => id }).
map { case event @ Event(id, _) => Map(id -> event) }.
reduce(_ ++ _).
mergeSubstreams.
map(_.values.toList).
toMat(Sink.foreach(latest = _))(Keep.none).run()
val definition = get { complete(Option(latest)) }
}
Я также попробовал аналогичный подход, который использует "блочного" субъекта и передает ему агрегаты, но эффект тот же.
В комплекте с раковиной, которая возвращает last
элемент
Это первый подход, который я попытался применить. В результате ответ зависает до истечения времени ожидания, и Akka HTTP возвращает 500 в браузер.
final class Service(hub: Source[Event, NotUsed])(implicit s: ActorSystem, m: ActorMaterializer, t: Timeout) extends Directives with JsonSupport {
implicit private val executor = system.dispatcher
private[this] val currentLocations =
hub.
groupBy(Int.MaxValue, { case Event(id, _) => id }).
map { case event @ Event(id, _) => Map(id -> event) }.
reduce(_ ++ _).
mergeSubstreams.
map(_.values.toList).
runWith(Sink.reduce((_, next) => next))
val definition = get { complete(currentLocations) }
}
1 ответ
ActorRef
Как раковина
Вы можете создать Actor
это работает Map
из id
в Event
:
import scala.collection.immutable
object QueryMap
class MapKeeperActor() extends Actor {
var internalMap = immutable.Map.empty[String, Event]
override def receive = {
case e : Event => internalMap = internalMap + (e.id -> e)
case _ : QueryMap => sender ! internalMap
}
}
Эта ссылка может быть использована в Sink
который будет прикреплен к BroadcastHub
:
object OnCompleteMessage
val system : ActorSystem = ???
val mapKeeperRef = system.actorOf(Props[MapKeeperActor])
val mapKeeperSink : Sink[Event, _] = Sink.actorRef[Event](mapKeeperRef, OnCompleteMessage)
Query Actor in Route
Теперь мы можем создать Route
который запросит хранителя карты с помощью директив. Тем не менее, вам придется решить, как сериализовать Map
в ResponseEntity
для HttpResponse
:
val serializeMap : Map[String, Event] => ResponseEntity = ???
val route =
get {
onComplete( (mapKeeperRef ? QueryMap).mapTo[Map[String, Event]]) {
case Success(map) => complete(HttpResponse(entity=serializeMap(map))
case Failure(ex) => complete((InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}