Отправка ответа от абонента astic4s обратно в akka-поток
В настоящее время я создаю решение для потоковой передачи данных из mongoDb в asticsearch. Моя цель состоит в том, чтобы отслеживать все успешно переданные элементы в эластичный поиск. Я использую akka-streams и astic4s. В настоящее время потоковая передача в ES выглядит следующим образом
val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
batchSize = batchSize,
completionFn = { () => elasticFinishPromise.success(()); ()},
errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
concurrentRequests = concurrentRequests
)
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)
И из моего источника что-то вроде этого:
val a: [NotUsed] = mongoSrc
.via(some operations..)
.to(esSink)
.run()
Теперь все работает нормально, и сейчас я регистрирую, например, количество предметов со второй раковиной. Но я бы лучше занес в журнал те вещи, которые действительно были переданы в эластичный поиск. Абонент astic4s предлагает listener: ResponseListener
с onAck(): Unit
а также onFailure(): Unit
и я хотел бы получить эту информацию обратно в поток, как это
val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item
mongoSrc ~> doStuff ~> esSink ~> logSink
Как бы я это реализовал? Нужен ли мне пользовательский этап, который буферизует элементы onAck
и onFailure
? Или есть более простой способ?
Спасибо за любую помощь.
1 ответ
Вы можете "расцвести" Subscriber[T]
опускаться, используя Flow.fromSinkAndSource
, Посмотрите на иллюстрацию "Составной поток (из раковины и источника)" из документов.
В этом случае вы будете прикреплять свой пользовательский actorPublisher в качестве источника и отправлять ему сообщения от onAck()
,
Так как вы попросили более простой способ:
val doStuff = Flow[DocToIndex]
.grouped(batchSize)
.mapAsync(concurrentRequests)(bulkopFuture)
В двух словах и в стороне от всех полезных абстракций, подписчик astic4s - это просто массовый запрос на обновление.