Изменить материализованное значение в источнике, используя содержимое потока

Alpakka предоставляет отличный способ доступа к десяткам различных источников данных. Файловые источники, такие как HDFS и FTP, поставляются в виде Source[ByteString, Future[IOResult], Однако HTTP-запросы через Akka HTTP доставляются в виде потоков сущностей Source[ByteString, NotUsed], В моем случае я хотел бы получить контент из источников HTTP как Source[ByteString, Future[IOResult] поэтому я могу создать унифицированную систему извлечения ресурсов, которая работает из нескольких схем (hdfs, file, ftp и S3 в данном случае).

В частности, я хотел бы преобразовать Source[ByteString, NotUsed] источник к Source[ByteString, Future[IOResult] где я могу рассчитать IOResult из входящего потока байтов. Есть много методов, таких как flatMapConcat а также viaMat но ни один из них не может извлечь подробности из входного потока (например, количество прочитанных байтов) или инициализировать IOResult структура правильно. В идеале я ищу метод со следующей сигнатурой, который будет обновлять IOResult при поступлении потока.

  def matCalc(src: Source[ByteString, Any]) = Source[ByteString, Future[IOResult]] = {
    src.someMatFoldMagic[ByteString, IOResult](IOResult.createSuccessful(0))(m, b) => m.withCount(m.count + b.length))
  }

2 ответа

Решение

Я не могу вспомнить ни одной существующей функциональности, которая может из коробки сделать это, но вы можете использовать также функцию потока ToMat (удивительно, не нашел ее в документах akka streams, хотя вы можете посмотреть это в документации по исходному коду и Java API) вместе с Sink.fold накопить какое-то значение и отдать его в самый конец. например:

def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
    source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)

дело в том, что alsoToMat объединяет значение входного мата с указанным в alsoToMat, в то же время значения, полученные источником, не зависят от alsoToMat:

def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
  viaMat(alsoToGraph(that))(matF)

не так сложно адаптировать эту функцию для возврата IOResult, что в соответствии с исходным кодом:

final case class IOResult(count: Long, status: Try[Done]) { ... }

еще одна последняя вещь, на которую вам нужно обратить внимание - вы хотите, чтобы ваш источник был похож на:

Source[ByteString, Future[IOResult]]

но если вы не хотите переносить эти значения mat до самого конца определения потока, а затем делать что-то на основе этого будущего завершения, это может быть подвержено ошибкам. Например, в этом примере я заканчиваю работу на основе этого будущего, поэтому последнее значение не будет обработано:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object App extends App {

  private implicit val sys: ActorSystem = ActorSystem()
  private implicit val mat: ActorMaterializer = ActorMaterializer()
  private implicit val ec: ExecutionContext = sys.dispatcher

  val source: Source[Int, Any] = Source((1 to 5).toList)

  def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
    source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)

  val f = magic(source).throttle(1, 1.second).toMat(Sink.foreach(println))(Keep.left).run()
  f.onComplete(t => println(s"f1 completed - $t"))
  Await.ready(f, 5.minutes)


  mat.shutdown()
  sys.terminate()
}

Это можно сделать с помощью Promise для распространения материализованного значения.

val completion = Promise[IoResult]
val httpWithIoResult = http.mapMaterializedValue(_ => completion.future)

Теперь осталось завершить completion обещать, когда соответствующие данные станут доступными.

Альтернативный подход заключается в том, чтобы перейти к GraphStage API, где вы получаете более низкий уровень контроля распространения материализованного значения. Но даже там, используя Promises часто выбранная реализация для распространения материализованного значения. Взгляните на встроенные реализации операторов, такие как Ignore,

Другие вопросы по тегам