Поток FS2 работает до конца InputStream

Я очень новичок в FS2 и мне нужна помощь в разработке. Я пытаюсь создать поток, который будет тянуть куски из нижележащего InputStream пока все не закончится. Вот что я попробовал:

import java.io.{File, FileInputStream, InputStream}

import cats.effect.IO
import cats.effect.IO._

object Fs2 {

  def main(args: Array[String]): Unit = {
    val is = new FileInputStream(new File("/tmp/my-file.mf"))
    val stream = fs2.Stream.eval(read(is))
    stream.compile.drain.unsafeRunSync()
  }

  def read(is: InputStream): IO[Array[Byte]] = IO {
    val buf = new Array[Byte](4096)
    is.read(buf)
    println(new String(buf))
    buf
  }
}

И программа печатает только первый кусок. Это разумно. Но я хочу найти способ "сигнализировать", где прекратить чтение, а где не остановить. Я имею в виду продолжать звонить read(is) до конца. Есть ли способ достичь этого?

Я тоже пробовал repeatEval(read(is)) но он продолжает читать вечно... Мне нужно что-то среднее.

1 ответ

Решение

Использование fs2.io.readInputStreamили же fs2.io.readInputStreamAsync, Первый блокирует текущий поток; последний блокирует поток в ExecutionContext, Например:

val is: InputStream = new FileInputStream(new File("/tmp/my-file.mf"))
val stream = fs2.io.readInputStreamAsync(IO(is), 128)
Другие вопросы по тегам