Запись элементов в файл по мере их удаления из очереди: Scala fs2 Stream
У меня есть небольшой тест потоков fs2, элементов процесса, ожидания, а затем записи их в файл. Я получаю сообщение об ошибке типа и не могу понять, что это означает:
Ошибка: required: fs2.Stream[[x]cats.effect.IO[x],Unit] => fs2.Stream[[+A]cats.effect.IO[A],Unit],
found : [F[_]]fs2.Pipe[F,Byte,Unit]
импортировать java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.io
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q: Queue[IO, Int])(implicit timer: Timer[IO]) {
import core.Processing._
val blocker: Blocker =
Blocker.liftExecutionContext(
scala.concurrent.ExecutionContext.Implicits.global
)
def storeInQueue: Stream[IO, Unit] = {
Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
.metered(Random.between(1, 20).seconds)
.through(q.enqueue)
}
def getFromQueue: Stream[IO, Unit] = {
q.dequeue
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
.through(
io.file
.writeAll(Paths.get("file.txt"), blocker)
)
}
}
object Five extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, Int](10)
b = new StreamTypeIntToDouble(q)
_ <- b.storeInQueue.compile.drain.start
_ <- b.getFromQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
1 ответ
Здесь есть пара проблем, и первая из них самая запутанная. writeAll
полиморфен в своем контексте F[_]
, но для этого требуется ContextShift
пример для F
(также как и Sync
). В настоящее время у вас нетContextShift[IO]
в области видимости, поэтому компилятор не сделает вывод, что F
за writeAll
должно быть IO
. Если вы добавите что-то вроде этого:
implicit val ioContextShift: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
â € тогда компилятор сделает вывод IO
как и следовало ожидать.
В таких случаях я предлагаю пропустить вывод типа. Записать его с параметром типа лишь немного сложнее:
.through(
io.file
.writeAll[IO](Paths.get("file.txt"), blocker)
)
• а это означает, что вы будете получать полезные сообщения об ошибках, например, об отсутствии экземпляров классов типов.
После того, как вы исправите эту проблему, появится еще пара. Следующее - использованиеevalMap
в этом контексте означает, что у вас будет поток ()
ценности. Если вы измените его наevalTap
, побочные эффекты регистрации по-прежнему будут происходить надлежащим образом, но вы не потеряете фактические значения потока, для которого вы его вызываете.
Последняя проблема в том, что writeAll
требует поток байтов, а вы дали ему поток Int
с. Как вы хотите справиться с этим несоответствием, зависит от предполагаемой семантики, но для примера что-то вроде.map(_.toByte)
заставит его скомпилировать.