Объединить двух потребителей в один потребитель, который возвращает несколько значений?

Я экспериментировал с новым пакетом pipe-http и подумал. У меня есть два парсера для веб-страницы, один из которых возвращает позиции, а другой - число из другого места на странице. Когда я беру страницу, было бы неплохо объединить эти парсеры и получить их результаты в одно и то же время от одного и того же производителя тестирующих строк, вместо того, чтобы извлекать страницу дважды или извлекать весь HTML-код в память и анализировать его дважды.

Другими словами, скажем, у вас есть два потребителя:

c1 :: Consumer a m r1
c2 :: Consumer a m r2

Можно ли сделать такую ​​функцию:

combineConsumers :: Consumer a m r1 -> Consumer a m r2 -> Consumer a m (r1, r2)
combineConsumers = undefined

Я попробовал несколько вещей, но я не могу понять это. Я понимаю, если это невозможно, но это было бы удобно.

Редактировать:

Мне жаль, что оказалось, что я делал предположение о pipe-attoparsec, из-за моего опыта с pipeit-attoparsec, который заставил меня задать неправильный вопрос. Pipes-attoparsec превращает attoparsec в анализатор каналов, когда я только предполагал, что он вернет Consumer для каналов. Это означает, что я не могу на самом деле превратить два анализатора attoparsec в потребителей, которые берут текст и возвращают результат, а затем используют их с простой старой конвейерной экосистемой. Извините, но я просто не разбираюсь в трубе.

Несмотря на то, что это не помогает мне, ответ Артура в значительной степени соответствует тому, что я предполагал, когда задавал вопрос, и я, вероятно, в конечном итоге буду использовать его решение в будущем. А пока я просто собираюсь использовать трубопровод.

5 ответов

Решение

Я думаю, что с вашим поведением что-то не так, по причинам, которые Даворак упоминает в своем замечании. Но если вам действительно нужна такая функция, вы можете определить ее.

import Pipes.Internal
import Pipes.Core

zipConsumers :: Monad m => Consumer a m r -> Consumer a m s -> Consumer a m (r,s)
zipConsumers p q = go (p,q) where
  go (p,q) = case (p,q) of 
     (Pure r     , Pure s)      -> Pure (r,s)
     (M mpr      , ps)          -> M (do pr <- mpr
                                         return (go (pr, ps)))
     (pr         , M mps)       -> M (do ps <- mps
                                         return (go (pr, ps)))
     (Request _ f, Request _ g) -> Request () (\a -> go (f a, g a))
     (Request _ f, Pure s)      -> Request () (\a -> do r <- f a
                                                        return (r, s))
     (Pure r     , Request _ g) -> Request () (\a -> do s <- g a
                                                        return (r,s))
     (Respond x _, _          ) -> closed x
     (_          , Respond y _) -> closed y

Если вы "заархивируете" потребителей, не используя их возвращаемое значение, вы можете использовать только их "эффекты". tee consumer1 >-> consumer2

Если результаты "моноидальны", вы можете использовать tee функция из прелюдии труб, в сочетании с WriterT,

{-# LANGUAGE OverloadedStrings #-}

import Data.Monoid
import Control.Monad
import Control.Monad.Writer
import Control.Monad.Writer.Class
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.Text as T

textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"

counter :: Monoid w => T.Text 
                    -> (T.Text -> w) 
                    -> Consumer T.Text (WriterT w IO) ()
counter word inject = P.filter (==word) >-> P.mapM (tell . inject) >-> P.drain

main :: IO ()
main = do
    result <-runWriterT $ runEffect $ 
        hoist lift textSource >-> 
        P.tee (counter "foo" inject1) >-> (counter "bar" inject2)
    putStrLn . show $ result
    where
    inject1 _ = (,) (Sum 1) mempty
    inject2 _ = (,) mempty (Sum 1)

Обновление: как уже упоминалось в комментарии, реальная проблема, которую я вижу в том, что в pipes парсеры не Consumers, И как вы можете запустить два парсера одновременно, если они по-разному относятся к остаткам? Что произойдет, если один из парсеров захочет "отрисовать" какой-то текст, а другой - нет?

Одним из возможных решений является запуск парсеров по-настоящему параллельным образом в разных потоках. Примитивы в pipes-concurrency пакет позволит вам "продублировать" Producer записав одни и те же данные в два разных почтовых ящика. И тогда каждый парсер может делать все, что захочет, со своей собственной копией производителя. Вот пример, который также использует pipes-parse, pipes-attoparsec а также async пакеты:

{-# LANGUAGE OverloadedStrings #-}

import Data.Monoid
import qualified Data.Text as T
import Data.Attoparsec.Text hiding (takeWhile)
import Data.Attoparsec.Combinator
import Control.Applicative
import Control.Monad
import Control.Monad.State.Strict
import Pipes
import qualified Pipes.Prelude as P
import qualified Pipes.Attoparsec as P
import qualified Pipes.Concurrent as P
import qualified Control.Concurrent.Async as A

parseChars :: Char -> Parser [Char] 
parseChars c = fmap mconcat $ 
    many (notChar c) *> many1 (some (char c) <* many (notChar c))

textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"

parseConc :: Producer T.Text IO () 
          -> Parser a 
          -> Parser b 
          -> IO (Either P.ParsingError a,Either P.ParsingError b)
parseConc producer parser1 parser2 = do
    (outbox1,inbox1,seal1) <- P.spawn' P.Unbounded
    (outbox2,inbox2,seal2) <- P.spawn' P.Unbounded
    feeding <- A.async $ runEffect $ producer >-> P.tee (P.toOutput outbox1) 
                                              >->        P.toOutput outbox2
    sealing <- A.async $ A.wait feeding >> P.atomically seal1 >> P.atomically seal2
    r <- A.runConcurrently $ 
        (,) <$> (A.Concurrently $ parseInbox parser1 inbox1)
            <*> (A.Concurrently $ parseInbox parser2 inbox2)
    A.wait sealing
    return r 
    where
    parseInbox parser inbox = evalStateT (P.parse parser) (P.fromInput inbox)

main :: IO ()
main = do
    (Right a, Right b) <- parseConc textSource (parseChars 'o')  (parseChars 'a')
    putStrLn . show $ (a,b) 

Результат:

("oooo","aa")

Я не уверен, сколько накладных расходов вводит этот подход.

Идиоматическое решение - переписать ваш Consumerс как Fold или же FoldM от foldl библиотека, а затем объединить их, используя Applicative стиль. Затем вы можете преобразовать этот комбинированный сгиб в тот, который работает на трубах.

Давайте предположим, что у вас есть два Folds:

fold1 :: Fold a r1
fold2 :: Fold a r2

... или два FoldMs:

foldM1 :: Monad m => FoldM a m r1
foldM2 :: Monad m => FoldM a m r2

Затем вы объединяете их в один Fold/FoldM с помощью Applicative стиль:

import Control.Applicative

foldBoth :: Fold a (r1, r2)
foldBoth = (,) <$> fold1 <*> fold2

foldBothM :: Monad m => FoldM a m (r1, r2)
foldBothM = (,) <$> foldM1 <*> foldM2

-- or: foldBoth  = liftA2 (,) fold1  fold2
--     foldMBoth = liftA2 (,) foldM1 foldM2

Вы можете превратить любой фолд в Pipes.Preludeсгиб стиля или Parser, Вот необходимые функции преобразования:

import Control.Foldl (purely, impurely)
import qualified Pipes.Prelude as Pipes
import qualified Pipes.Parse   as Parse

purely Pipes.fold
    :: Monad m => Fold a b -> Producer a m () -> m b

impurely Pipes.foldM
    :: Monad m => FoldM m a b -> Producer a m () -> m b

purely Parse.foldAll
    :: Monad m => Fold a b -> Parser a m r

impurely Parse.foldMAll
    :: Monad m => FoldM a m b -> Parser a m r

Причина для purely а также impurely функции так, что foldl а также pipes может взаимодействовать друг с другом без зависимости от другого. Кроме того, они позволяют библиотекам, кроме pipes (лайк conduit) повторно использовать foldl без зависимости тоже (Подсказка, @MichaelSnoyman).

Я извиняюсь, что эта функция не задокументирована, в основном потому, что мне понадобилось время, чтобы понять, как ее получить. pipes а также foldl взаимодействовать без зависимости, и это было после того, как я написал pipes руководство. Я обновлю учебник, чтобы указать на этот трюк.

Чтобы научиться использовать foldlПросто прочитайте документацию в основном модуле. Это очень маленькая и простая в освоении библиотека.

Для чего бы то ни было, в мире каналов, соответствующая функция - это zipSinks. Может быть какой-то способ адаптировать эту функцию для работы с каналами, но может помешать автоматическое завершение.

Потребитель формирует монаду так

combineConsumers = liftM2 (,)

Тип проверки. К сожалению, семантика может отличаться от того, что вы ожидаете: первый потребитель будет работать до конца, а затем второй.

Другие вопросы по тегам