Обработка исключений в библиотеке iteratee без состояния ошибки

Я пытаюсь написать перечислитель для чтения файлов построчно из java.io.BufferedReader используя библиотеку iteratee в Scalaz 7, которая в настоящее время предоставляет только (чрезвычайно медленный) перечислитель для java.io.Reader,

Проблемы, с которыми я сталкиваюсь, связаны с тем, что все другие итеративные библиотеки, которыми я пользовался (например, Play 2.0 и John Millikin's).enumerator для Haskell) было состояние ошибки, как один из их Step конструкторы типа, а Scalaz 7 нет.

Моя текущая реализация

Вот что у меня сейчас есть. Сначала для некоторого импорта и IO оберток:

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I, _ }

def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())

И псевдоним типа, чтобы очистить вещи немного:

type ErrorOr[A] = Either[Throwable, A]

А теперь tryIO помощник, смоделированный (слабо и, вероятно, неправильно) на один в enumerator:

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, ErrorOr[B]](
  action.catchLeft.map(
    r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
  )
)

Перечислитель для BufferedReader сам:

def enumBuffered(r: => BufferedReader) = new EnumeratorT[ErrorOr[String], IO] {
  lazy val reader = r
  def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
    tryIO(readLine(reader)) flatMap {
      case Right(None)       => s.pointI
      case Right(Some(line)) => k(I.elInput(Right(line))) >>== apply[A]
      case Left(e)           => k(I.elInput(Left(e)))
    }
  )
}

И, наконец, перечислитель, который отвечает за открытие и закрытие читателя:

def enumFile(f: File) = new EnumeratorT[ErrorOr[String], IO] {
  def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
    tryIO(openFile(f)) flatMap {
      case Right(reader) => I.iterateeT(
        enumBuffered(reader).apply(s).value.ensuring(closeReader(reader))
      )
      case Left(e) => k(I.elInput(Left(e)))
    }
  )
}

Теперь предположим, например, что я хочу собрать все строки в файле, который содержит как минимум двадцать пять '0' символы в список. Я могу написать:

val action: IO[ErrorOr[List[String]]] = (
  I.consume[ErrorOr[String], IO, List] %=
  I.filter(_.fold(_ => true, _.count(_ == '0') >= 25)) &=
  enumFile(new File("big.txt"))
).run.map(_.sequence)

Во многих отношениях это, кажется, работает прекрасно: я могу начать действие с unsafePerformIO и он будет разбит на десятки миллионов строк и гигабайт данных за пару минут в постоянной памяти и без перегрузки стека, а затем закроет считыватель, когда это будет сделано. Если я дам ему имя файла, который не существует, он покорно вернет мне исключение, заключенное в Left, а также enumBuffered по крайней мере, кажется, ведет себя должным образом, если во время чтения возникает исключение.

Потенциальные проблемы

У меня есть некоторые опасения по поводу моей реализации, особенно tryIO, Например, предположим, что я пытаюсь составить несколько итераций:

val it = for {
  _ <- tryIO[Unit, Unit](IO(println("a")))
  _ <- tryIO[Unit, Unit](IO(throw new Exception("!")))
  r <- tryIO[Unit, Unit](IO(println("b")))
} yield r

Если я запускаю это, я получаю следующее:

scala> it.run.unsafePerformIO()
a
b
res11: ErrorOr[Unit] = Right(())

Если я попробую то же самое с enumerator в GHCi результат больше похож на то, что я ожидал:

...> run $ tryIO (putStrLn "a") >> tryIO (error "!") >> tryIO (putStrLn "b")
a
Left !

Я просто не вижу способа получить такое поведение без состояния ошибки в самой библиотеке iteratee.

Мои вопросы

Я не претендую на звание эксперта по итерациям, но я использовал различные реализации Haskell в нескольких проектах, чувствую, что я более или менее понимаю фундаментальные концепции, и однажды пил кофе с Олегом. Я здесь в растерянности, хотя. Это разумный способ обработки исключений при отсутствии состояния ошибки? Есть ли способ реализовать tryIO что будет вести себя больше как enumerator версия? Есть ли какая-то бомба замедленного действия, ожидающая меня в том, что моя реализация ведет себя по-другому?

2 ответа

Решение

РЕДАКТИРОВАТЬ здесь является реальным решением. Я оставил в оригинальном сообщении, потому что я думаю, что стоит увидеть образец. Что работает для Klesli работает для IterateeT

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }

object IterateeIOExample {
  type ErrorOr[+A] = EitherT[IO, Throwable, A]

  def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
  def readLine(r: BufferedReader) = IO(Option(r.readLine))
  def closeReader(r: BufferedReader) = IO(r.close())

  def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
    EitherT.fromEither(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
  }

  def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] {
    lazy val reader = r
    def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k =>
      tryIO(readLine(reader)) flatMap {
        case None => s.pointI
        case Some(line) => k(I.elInput(line)) >>== apply[A]
      })
  }

  def enumFile(f: File) = new EnumeratorT[String, ErrorOr] {
    def apply[A] = (s: StepT[String, ErrorOr, A]) => 
      tryIO(openFile(f)).flatMap(reader => I.iterateeT[String, ErrorOr, A](
        EitherT(
          enumBuffered(reader).apply(s).value.run.ensuring(closeReader(reader)))))
  }

  def main(args: Array[String]) {
    val action = (
      I.consume[String, ErrorOr, List] %=
      I.filter(a => a.count(_ == '0') >= 25) &=
      enumFile(new File(args(0)))).run.run

    println(action.unsafePerformIO().map(_.size))
  }
}

===== Оригинальный пост =====

Я чувствую, что вам нужно EitherT в миксе. Без EitherT вы просто получаете 3 левых или права. С EitherT это будет право слева.

Я думаю, что вы действительно хотите,

type ErrorOr[+A] = EitherT[IO, Throwable, A] 
I.iterateeT[A, ErrorOr, B]

Следующий код имитирует то, как вы сейчас пишете вещи. Поскольку IterateeT не имеет понятия левого и правого, когда вы его составляете, вы просто получаете кучу IO/Id.

scala> Kleisli((a:Int) => 4.right[String].point[Id])
res11: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.\/[String,Int]] = scalaz.KleisliFunctions$$anon$18@73e771ca

scala> Kleisli((a:Int) => "aa".left[Int].point[Id])
res12: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.\/[String,Int]] = scalaz.KleisliFunctions$$anon$18@be41b41

scala> for { a <- res11; b <- res12 } yield (a,b)
res15: scalaz.Kleisli[scalaz.Scalaz.Id,Int,(scalaz.\/[String,Int], scalaz.\/[String,Int])] = scalaz.KleisliFunctions$$anon$18@42fd1445

scala> res15.run(1)
res16: (scalaz.\/[String,Int], scalaz.\/[String,Int]) = (\/-(4),-\/(aa))

В следующем коде вместо идентификатора мы используем EitherT. Поскольку EitherT имеет такое же поведение связывания, что и Either, мы получаем то, что хотим.

scala>  type ErrorOr[+A] = EitherT[Id, String, A]
defined type alias ErrorOr

scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT(4.right[String].point[Id]))
res22: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@58b547a0

scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT("aa".left[Int].point[Id]))
res24: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@342f2ceb

scala> for { a <- res22; b <- res24 } yield 2
res25: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@204eab31

scala> res25.run(2).run
res26: scalaz.Scalaz.Id[scalaz.\/[String,Int]] = -\/(aa)

Вы можете заменить Keisli на IterateeT и Id на IO, чтобы получить то, что вам нужно.

Путь pipes делает это, чтобы набрать композицию класса с использованием Channel тип класса:

class Channel p where
    {-| 'idT' acts like a \'T\'ransparent proxy, passing all requests further
        upstream, and passing all responses further downstream. -}
    idT :: (Monad m) => a' -> p a' a a' a m r

    {-| Compose two proxies, satisfying all requests from downstream with
        responses from upstream. -}
    (>->) :: (Monad m)
          => (b' -> p a' a b' b m r)
          -> (c' -> p b' b c' c m r)
          -> (c' -> p a' a c' c m r)
    p1 >-> p2 = p2 <-< p1

... и вывел поднятую композицию поверх EitherT из базовой композиции. Это частный случай принципа прокси-преобразователей, введенный в pipes-2.4, что позволяет поднимать композицию над произвольными расширениями.

Этот подъем требует определения EitherT специализируется на форме Proxy введите Control.Proxy.Trans.Either:

newtype EitherP e p a' a b' b (m :: * -> *) r
  = EitherP { runEitherP :: p a' a b' b m (Either e r) }

Эта специализация на Proxy форма необходима для того, чтобы иметь возможность определить хорошо типизированный экземпляр Channel учебный класс. Scala может быть более гибким в этом отношении, чем Haskell.

Тогда я просто переопределить Monad экземпляр (и другие экземпляры) вместе со всеми обычными EitherT операции для этого специализированного типа:

throw :: (Monad (p a' a b' b m)) => e -> EitherP e p a' a b' b m r
throw = EitherP . return . Left

catch
 :: (Monad (p a' a b' b m))
 => EitherP e p a' a b' b m r        -- ^ Original computation
 -> (e -> EitherP f p a' a b' b m r) -- ^ Handler
 -> EitherP f p a' a b' b m r        -- ^ Handled computation
catch m f = EitherP $ do
    e <- runEitherP m
    runEitherP $ case e of
        Left  l -> f     l
        Right r -> right r

Имея это в виду, я могу определить следующий экземпляр поднятой композиции:

-- Given that 'p' is composable, so is 'EitherP e p'
instance (Channel p) => Channel (EitherP e p) where
    idT = EitherP . idT
    p1 >-> p2 = (EitherP .) $ runEitherP . p1 >-> runEitherP . p2

Чтобы понять, что там происходит, просто следуйте инструкциям:

p1 :: b' -> EitherP e p a' a b' b m r
p2 :: c' -> EitherP e p b' b c' c m r

runEitherP . p1 :: b' -> p a' a b' b m (Either e r)
runEitherP . p2 :: c' -> p b' b c' c m (Either e r)

-- Use the base composition for 'p'
runEitherP . p1 >-> runEitherP . p2
 :: c' -> p a' a c' c m (Either e r)

-- Rewrap in EitherP
(EitherP . ) $ runEitherP . p1 >-> runEitherP . p2
 :: c' -> EitherP e p a' a c' c m r

Это позволяет генерировать и отлавливать ошибки на определенном этапе, не прерывая другие этапы. Вот пример, который я скопировал и вставил из моего pipes-2.4 объявление сообщение:

import Control.Monad (forever)
import Control.Monad.Trans (lift)
import Control.Proxy
import Control.Proxy.Trans.Either as E
import Safe (readMay)

promptInts :: () -> EitherP String Proxy C () () Int IO r
promptInts () = recover $ forever $ do
    str <- lift getLine
    case readMay str of
        Nothing -> E.throw "Could not parse an integer"
        Just n  -> liftP $ respond n

recover p =
    p `E.catch` (\str -> lift (putStrLn str) >> recover p)

main = runProxy $ runEitherK $ mapP printD <-< promptInts

Вот результат:

>>> main
1<Enter>
1
Test<Enter>
Could not parse an integer
Apple<Enter>
Could not parse an integer
5<Enter>
5

Ответ на итеративный подход аналогичен. Вы должны взять существующий способ составления итерированных и перебрать его EitherT, Используете ли вы классы типов или просто определяете новый оператор композиции, зависит только от вас.

Некоторые другие полезные ссылки:

Другие вопросы по тегам