Моделирование нескольких вызовов функций с помощью потока (безопасным способом FP)

Учитывая функцию A => IO[B] (ака Kleisli[IO, A, B]) который должен вызываться несколько раз и имеет побочные эффекты, такие как обновление БД, как делегировать такие множественные вызовы в поток (я полагаю, Pipe[IO, A, B]) (fs2, monix наблюдаемый / итерант)? Причина этого в том, чтобы иметь возможность накапливать состояние, выполнять групповые вызовы вместе в течение временного окна и т. Д.

Конкретнее, серверу http4s требуется Request => IO[Response], поэтому я смотрю, как работать с потоками (для вышеуказанных преимуществ), но в конечном итоге предоставить такую ​​функцию для http4s.

Я подозреваю, что для этого понадобится некоторый идентификатор корреляции за кулисами, и я в порядке с этим, меня больше интересует, как сделать это безопасно и правильно с точки зрения FP.

В конечном счете, подпись, которую я ожидаю, выглядит примерно так:

Pipe[IO, A, B] => (A => IO[B]), такие, что звонки в Клейсли проходят по трубе.

Как запоздалая мысль, будет ли вообще возможно противодавление?

0 ответов

Одна из идей - смоделировать его с помощью MPSC (Multiple Publisher Single Consumer). Я приведу пример с Monix, поскольку я более знаком с ним, но идея остается прежней, даже если вы используете FS2.

object MPSC extends App {

  sealed trait Event
  object Event {
    // You'll need a promise in order to send the response back to user
    case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
  }

  // For backpressure, take a look at `PublishSubject`.
  val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)

  def pushEvent(num: Int) = {
    for {
      promise <- Deferred[Task, Int]
      _ <- Task.delay(cs.onNext(SaveItem(num, promise)))
    } yield promise
  }

  // You get a list of events now since it is buffered
  // Monix has a lot of buffer strategies, check the docs for more details
  def processEvents(items: Seq[Event]): Task[Unit] = {
    Task.delay(println(s"Items: $items")) >>
      Task.traverse(items) {
        case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
      }.void
  }

  val app = for {
    // Start the stream in the background
    _ <- cs
      .bufferTimed(3.seconds) // Buffer all events within 3 seconds
      .filter(_.nonEmpty)
      .mapEval(processEvents)
      .completedL
      .startAndForget

    _ <- Task.sleep(1.second)
    p1 <- pushEvent(10)
    p2 <- pushEvent(20)
    p3 <- pushEvent(30)

    // Wait for the promise to complete, you'll do this for each request
    x <- p1.get
    y <- p2.get
    z <- p3.get

    _ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
  } yield ()

  app.runSyncUnsafe()
}
Другие вопросы по тегам