Существует ли концепция, подобная Iteratee, которая извлекает данные из нескольких источников?

Можно получить по запросу из ряда (скажем, два для простоты) источников, использующих потоки (ленивые списки). Итераторы могут использоваться для обработки данных, поступающих из одного источника.

Существует ли функциональная концепция, подобная Iteratee, для обработки нескольких источников ввода? Я мог бы представить себе Итератора, чье состояние сообщает, из какого источника он хочет получить данные.

4 ответа

Решение

Чтобы сделать это, используя трубы, вы вкладываете монадный трансформатор Pipe в себя, по одному разу для каждого производителя, с которым вы хотите взаимодействовать. Например:

import Control.Monad
import Control.Monad.Trans
import Control.Pipe

producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]

consumes2 :: (Show a, Show b) =>
    Consumer a (Consumer b IO) r
consumes2 = forever $ do
    a <- await       -- await from outer producer
    b <- lift await  -- await from inner producer
    lift $ lift $ print (a, b)

Как и функция Curke для нескольких переменных на Haskell, вы частично применяете ее к каждому источнику, используя композицию и runPipe:

consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA

fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB

Приведенная выше функция выводит при запуске:

>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)

Этот трюк работает для получения или ожидания любого количества труб вверх или вниз по течению. Это также работает для прокси, двунаправленных аналогов для каналов.

Изменить: Обратите внимание, что это также работает для любой библиотеки Iteratee, а не только pipes, Фактически, Джон Миликин и Олег были первыми сторонниками этого подхода, и я просто украл у них эту идею.

Мы используем Машины в Scala, чтобы привлечь не только два, но произвольное количество источников.

Два примера бинарных объединений предоставлены самой библиотекой на Tee модуль: mergeOuterJoin а также hashJoin, Вот для чего нужен код hashJoin выглядит так (предполагается, что оба потока отсортированы):

/**
 * A natural hash join according to keys of type `K`.
 */
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
  def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
    a  <- awaits(left[A])
    mp <- build(m + (f(a) -> a))
  } yield mp) orElse Return(m)
  for {
    m <- build(Map())
    r <- (awaits(right[B]) flatMap (b => {
      val k = g(b)
      if (m contains k) emit(m(k) -> b) else Return(())
    })) repeatedly
  } yield r
}

Этот код создает Plan который "скомпилирован" в Machine с repeatedly метод. Тип строится здесь Tee[A, B, (A, B)] которая является машиной с двумя входами. Вы запрашиваете ввод слева и справа с awaits(left) а также awaits(right) и вы выводите с emit,

Существует также версия Machines для Haskell.

Трубопроводы (и это может быть построено для Pipes, но этот код еще не был выпущен) имеет zip примитив, который берет два восходящих потока и объединяет их в виде потока кортежей.

Проверьте библиотеку каналов, где вертикальная конкатенация может делать то, что вы хотите. Например,

import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void

source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"

sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x

pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)

Оператор секвенирования (>>) вертикально объединяет источники, получая выходные данные (на runPipe)

's'
'a'
'y'
'w'
'h'
'a'
't'
Другие вопросы по тегам