Отправка ответа от абонента astic4s обратно в akka-поток

В настоящее время я создаю решение для потоковой передачи данных из mongoDb в asticsearch. Моя цель состоит в том, чтобы отслеживать все успешно переданные элементы в эластичный поиск. Я использую akka-streams и astic4s. В настоящее время потоковая передача в ES выглядит следующим образом

val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
    batchSize = batchSize,
    completionFn = { () => elasticFinishPromise.success(()); ()},
    errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
    concurrentRequests = concurrentRequests
    )
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)

И из моего источника что-то вроде этого:

val a: [NotUsed] = mongoSrc
  .via(some operations..)
  .to(esSink)
  .run()

Теперь все работает нормально, и сейчас я регистрирую, например, количество предметов со второй раковиной. Но я бы лучше занес в журнал те вещи, которые действительно были переданы в эластичный поиск. Абонент astic4s предлагает listener: ResponseListener с onAck(): Unit а также onFailure(): Unit и я хотел бы получить эту информацию обратно в поток, как это

val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item

mongoSrc ~> doStuff ~> esSink ~> logSink

Как бы я это реализовал? Нужен ли мне пользовательский этап, который буферизует элементы onAck и onFailure? Или есть более простой способ?

Спасибо за любую помощь.

1 ответ

Вы можете "расцвести" Subscriber[T] опускаться, используя Flow.fromSinkAndSource, Посмотрите на иллюстрацию "Составной поток (из раковины и источника)" из документов.

В этом случае вы будете прикреплять свой пользовательский actorPublisher в качестве источника и отправлять ему сообщения от onAck(),

Так как вы попросили более простой способ:

val doStuff = Flow[DocToIndex]
                .grouped(batchSize)
                .mapAsync(concurrentRequests)(bulkopFuture)

В двух словах и в стороне от всех полезных абстракций, подписчик astic4s - это просто массовый запрос на обновление.

Другие вопросы по тегам