Как использовать IO с итерациями Scalaz7 без переполнения стека?
Рассмотрим этот код (взятый отсюда и измененный для использования байтов, а не строк символов).
import java.io.{ File, InputStream, BufferedInputStream, FileInputStream }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }
import std.list._
object IterateeIOExample {
type ErrorOr[+A] = EitherT[IO, Throwable, A]
def openStream(f: File) = IO(new BufferedInputStream(new FileInputStream(f)))
def readByte(s: InputStream) = IO(Some(s.read()).filter(_ != -1))
def closeStream(s: InputStream) = IO(s.close())
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
EitherT(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
}
def enumBuffered(r: => BufferedInputStream) = new EnumeratorT[Int, ErrorOr] {
lazy val reader = r
def apply[A] = (s: StepT[Int, ErrorOr, A]) => s.mapCont(k =>
tryIO(readByte(reader)) flatMap {
case None => s.pointI
case Some(byte) => k(I.elInput(byte)) >>== apply[A]
})
}
def enumFile(f: File) = new EnumeratorT[Int, ErrorOr] {
def apply[A] = (s: StepT[Int, ErrorOr, A]) =>
tryIO(openStream(f)).flatMap(stream => I.iterateeT[Int, ErrorOr, A](
EitherT(
enumBuffered(stream).apply(s).value.run.ensuring(closeStream(stream)))))
}
def main(args: Array[String]) {
val action = (
I.consume[Int, ErrorOr, List] &=
enumFile(new File(args(0)))).run.run
println(action.unsafePerformIO())
}
}
Выполнение этого кода в файле приличного размера (8 КБ) приводит к исключению StackruException. В результате некоторых поисков выяснилось, что исключения можно избежать, используя вместо IO монаду Trampoline, но это не кажется хорошим решением - пожертвуйте функциональной чистотой, чтобы программа вообще завершилась. Очевидный способ исправить это - использовать IO или Trampoline в качестве Monad Transformer, чтобы обернуть другой, но я не могу найти реализацию трансформаторной версии любого из них, и мне не хватает гуру функционального программирования, чтобы знаю, как написать свой собственный (узнать больше о FP - одна из целей этого проекта, но я подозреваю, что создание новых монадных трансформаторов сейчас немного выше моего уровня). Я полагаю, что я мог бы просто обернуть большое действие ввода-вывода вокруг создания, выполнения и возврата результатов моих итераторов, но это скорее обходной путь, чем решение.
Предположительно некоторые монады не могут быть преобразованы в преобразователи монад, поэтому я хотел бы знать, возможно ли работать с большими файлами без потери ввода-вывода или переполнения стека, и если да, то как?
Дополнительный вопрос: я не могу придумать, каким образом итератор может сигнализировать о том, что он обнаружил ошибку при обработке, за исключением того, что он должен возвращать Either, что упрощает их составление. Приведенный выше код показывает, как использовать EitherT для обработки ошибок в перечислителе, но как это работает для итераторов?
1 ответ
Создав исключения и напечатав их длину стека в разных местах вашего кода, я почувствовал, что ваш код не переполняется. Кажется, что все работает в постоянном размере стека. Поэтому я искал другие места. В конце концов я скопировал реализацию consume
и добавил немного глубины печати стека и подтвердил, что он переполнен там.
Так что это переполняет:
(I.consume[Int, Id, List] &= EnumeratorT.enumStream(Stream.fill(10000)(1))).run
Но потом я узнал, что это не так:
(I.putStrTo[Int](System.out) &= EnumeratorT.enumStream(Stream.fill(10000)(1)))
.run.unsafePerformIO()
putStrTo
использования foldM
и как-то не вызывает переполнения. Так что мне интересно, consume
может быть реализовано с точки зрения foldM
, Я просто скопировал несколько вещей из потребителя и подправил, пока он не скомпилирован:
def consume1[E, F[_]:Monad, A[_]:PlusEmpty:Applicative]: IterateeT[E, F, A[E]] = {
I.foldM[E, F, A[E]](PlusEmpty[A].empty){ (acc: A[E], e: E) =>
(Applicative[A].point(e) <+> acc).point[F]
}
}
И это сработало! Печать длинного списка целых.