FS2 поток для непрочитанного InputStream

Я хотел бы конвертировать fs2.Stream в java.io.InputStream поэтому я могу передать этот входной поток в http-фреймворк (Finch и Akka Http).

Я нашел fs2.io.toInputStream, но это не работает (ничего не печатает):

import java.io.{ByteArrayInputStream, InputStream}

import cats.effect.IO
import scala.concurrent.ExecutionContext.Implicits.global

object IOTest {

  def main(args: Array[String]): Unit = {
    val is: InputStream = new ByteArrayInputStream("test".getBytes)
    val stream: fs2.Stream[IO, Byte] = fs2.io.readInputStream(IO(is), 128)

    val test: Seq[InputStream] = stream.through(fs2.io.toInputStream).compile.toList.unsafeRunSync()

    println(scala.io.Source.fromInputStream(test.head).mkString)
  }
}

Насколько я понимаю, когда я бегу .unsafeRunSync() он потребляет весь поток, так что даже если он возвращает Seq[InputStream] подстилающий входной поток уже израсходован.

Есть ли способ, которым я могу преобразовать fs2.Stream[IO, Byte] в java.io.InputStream без потребления?

Thnaks!

0 ответов

Проблема в том, что compile вызывается преждевременно. Я уверен что под капотом fs2.io.toInputStream делает правильную вещь и заключает в скобки созданное InputStream, Это означает, что InputStream должны быть доступны внутри Stream сам (например, в map/flatMap вызов):

val wire: fs2.Stream[IO, Byte] = ???

val result: fs2.Stream[IO, String] = for {
  is <- wire.through(fs2.io.toInputStream)
  str = scala.io.Source.fromInputStream(is).mkString //<--- use the InputStream here
} yield str

println( result.compile.lastOrError.unsafeRunSync() ) //<--- compile at the _very_ end

Выходы:

тестовое задание

Похоже, Finch имеет поддержку fs2 https://github.com/finagle/finch/tree/master/fs2, а Akka также имеет потоковую реализацию и есть библиотеки взаимодействия fs2 - Akka Stream, такие как https://github.com/krasserm/streamz/tree/master/streamz-converter

Поэтому я рекомендую вам взглянуть на реализации, потому что они заботятся о жизненном цикле ресурсов. Возможно, вам не нужна вся библиотека, но она служит ориентиром.

И если вы начинаете с "безопасной зоны" с fs2, зачем уходить оттуда:)

Другие вопросы по тегам