FS2 поток для непрочитанного InputStream
Я хотел бы конвертировать fs2.Stream
в java.io.InputStream
поэтому я могу передать этот входной поток в http-фреймворк (Finch и Akka Http).
Я нашел fs2.io.toInputStream
, но это не работает (ничего не печатает):
import java.io.{ByteArrayInputStream, InputStream}
import cats.effect.IO
import scala.concurrent.ExecutionContext.Implicits.global
object IOTest {
def main(args: Array[String]): Unit = {
val is: InputStream = new ByteArrayInputStream("test".getBytes)
val stream: fs2.Stream[IO, Byte] = fs2.io.readInputStream(IO(is), 128)
val test: Seq[InputStream] = stream.through(fs2.io.toInputStream).compile.toList.unsafeRunSync()
println(scala.io.Source.fromInputStream(test.head).mkString)
}
}
Насколько я понимаю, когда я бегу .unsafeRunSync()
он потребляет весь поток, так что даже если он возвращает Seq[InputStream]
подстилающий входной поток уже израсходован.
Есть ли способ, которым я могу преобразовать fs2.Stream[IO, Byte]
в java.io.InputStream
без потребления?
Thnaks!
0 ответов
Проблема в том, что compile
вызывается преждевременно. Я уверен что под капотом fs2.io.toInputStream
делает правильную вещь и заключает в скобки созданное InputStream
, Это означает, что InputStream
должны быть доступны внутри Stream
сам (например, в map
/flatMap
вызов):
val wire: fs2.Stream[IO, Byte] = ???
val result: fs2.Stream[IO, String] = for {
is <- wire.through(fs2.io.toInputStream)
str = scala.io.Source.fromInputStream(is).mkString //<--- use the InputStream here
} yield str
println( result.compile.lastOrError.unsafeRunSync() ) //<--- compile at the _very_ end
Выходы:
тестовое задание
Похоже, Finch имеет поддержку fs2 https://github.com/finagle/finch/tree/master/fs2, а Akka также имеет потоковую реализацию и есть библиотеки взаимодействия fs2 - Akka Stream, такие как https://github.com/krasserm/streamz/tree/master/streamz-converter
Поэтому я рекомендую вам взглянуть на реализации, потому что они заботятся о жизненном цикле ресурсов. Возможно, вам не нужна вся библиотека, но она служит ориентиром.
И если вы начинаете с "безопасной зоны" с fs2, зачем уходить оттуда:)