Объединить двух потребителей в один потребитель, который возвращает несколько значений?
Я экспериментировал с новым пакетом pipe-http и подумал. У меня есть два парсера для веб-страницы, один из которых возвращает позиции, а другой - число из другого места на странице. Когда я беру страницу, было бы неплохо объединить эти парсеры и получить их результаты в одно и то же время от одного и того же производителя тестирующих строк, вместо того, чтобы извлекать страницу дважды или извлекать весь HTML-код в память и анализировать его дважды.
Другими словами, скажем, у вас есть два потребителя:
c1 :: Consumer a m r1
c2 :: Consumer a m r2
Можно ли сделать такую функцию:
combineConsumers :: Consumer a m r1 -> Consumer a m r2 -> Consumer a m (r1, r2)
combineConsumers = undefined
Я попробовал несколько вещей, но я не могу понять это. Я понимаю, если это невозможно, но это было бы удобно.
Редактировать:
Мне жаль, что оказалось, что я делал предположение о pipe-attoparsec, из-за моего опыта с pipeit-attoparsec, который заставил меня задать неправильный вопрос. Pipes-attoparsec превращает attoparsec в анализатор каналов, когда я только предполагал, что он вернет Consumer для каналов. Это означает, что я не могу на самом деле превратить два анализатора attoparsec в потребителей, которые берут текст и возвращают результат, а затем используют их с простой старой конвейерной экосистемой. Извините, но я просто не разбираюсь в трубе.
Несмотря на то, что это не помогает мне, ответ Артура в значительной степени соответствует тому, что я предполагал, когда задавал вопрос, и я, вероятно, в конечном итоге буду использовать его решение в будущем. А пока я просто собираюсь использовать трубопровод.
5 ответов
Я думаю, что с вашим поведением что-то не так, по причинам, которые Даворак упоминает в своем замечании. Но если вам действительно нужна такая функция, вы можете определить ее.
import Pipes.Internal
import Pipes.Core
zipConsumers :: Monad m => Consumer a m r -> Consumer a m s -> Consumer a m (r,s)
zipConsumers p q = go (p,q) where
go (p,q) = case (p,q) of
(Pure r , Pure s) -> Pure (r,s)
(M mpr , ps) -> M (do pr <- mpr
return (go (pr, ps)))
(pr , M mps) -> M (do ps <- mps
return (go (pr, ps)))
(Request _ f, Request _ g) -> Request () (\a -> go (f a, g a))
(Request _ f, Pure s) -> Request () (\a -> do r <- f a
return (r, s))
(Pure r , Request _ g) -> Request () (\a -> do s <- g a
return (r,s))
(Respond x _, _ ) -> closed x
(_ , Respond y _) -> closed y
Если вы "заархивируете" потребителей, не используя их возвращаемое значение, вы можете использовать только их "эффекты". tee consumer1 >-> consumer2
Если результаты "моноидальны", вы можете использовать tee
функция из прелюдии труб, в сочетании с WriterT
,
{-# LANGUAGE OverloadedStrings #-}
import Data.Monoid
import Control.Monad
import Control.Monad.Writer
import Control.Monad.Writer.Class
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.Text as T
textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"
counter :: Monoid w => T.Text
-> (T.Text -> w)
-> Consumer T.Text (WriterT w IO) ()
counter word inject = P.filter (==word) >-> P.mapM (tell . inject) >-> P.drain
main :: IO ()
main = do
result <-runWriterT $ runEffect $
hoist lift textSource >->
P.tee (counter "foo" inject1) >-> (counter "bar" inject2)
putStrLn . show $ result
where
inject1 _ = (,) (Sum 1) mempty
inject2 _ = (,) mempty (Sum 1)
Обновление: как уже упоминалось в комментарии, реальная проблема, которую я вижу в том, что в pipes
парсеры не Consumers
, И как вы можете запустить два парсера одновременно, если они по-разному относятся к остаткам? Что произойдет, если один из парсеров захочет "отрисовать" какой-то текст, а другой - нет?
Одним из возможных решений является запуск парсеров по-настоящему параллельным образом в разных потоках. Примитивы в pipes-concurrency
пакет позволит вам "продублировать" Producer
записав одни и те же данные в два разных почтовых ящика. И тогда каждый парсер может делать все, что захочет, со своей собственной копией производителя. Вот пример, который также использует pipes-parse
, pipes-attoparsec
а также async
пакеты:
{-# LANGUAGE OverloadedStrings #-}
import Data.Monoid
import qualified Data.Text as T
import Data.Attoparsec.Text hiding (takeWhile)
import Data.Attoparsec.Combinator
import Control.Applicative
import Control.Monad
import Control.Monad.State.Strict
import Pipes
import qualified Pipes.Prelude as P
import qualified Pipes.Attoparsec as P
import qualified Pipes.Concurrent as P
import qualified Control.Concurrent.Async as A
parseChars :: Char -> Parser [Char]
parseChars c = fmap mconcat $
many (notChar c) *> many1 (some (char c) <* many (notChar c))
textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"
parseConc :: Producer T.Text IO ()
-> Parser a
-> Parser b
-> IO (Either P.ParsingError a,Either P.ParsingError b)
parseConc producer parser1 parser2 = do
(outbox1,inbox1,seal1) <- P.spawn' P.Unbounded
(outbox2,inbox2,seal2) <- P.spawn' P.Unbounded
feeding <- A.async $ runEffect $ producer >-> P.tee (P.toOutput outbox1)
>-> P.toOutput outbox2
sealing <- A.async $ A.wait feeding >> P.atomically seal1 >> P.atomically seal2
r <- A.runConcurrently $
(,) <$> (A.Concurrently $ parseInbox parser1 inbox1)
<*> (A.Concurrently $ parseInbox parser2 inbox2)
A.wait sealing
return r
where
parseInbox parser inbox = evalStateT (P.parse parser) (P.fromInput inbox)
main :: IO ()
main = do
(Right a, Right b) <- parseConc textSource (parseChars 'o') (parseChars 'a')
putStrLn . show $ (a,b)
Результат:
("oooo","aa")
Я не уверен, сколько накладных расходов вводит этот подход.
Идиоматическое решение - переписать ваш Consumer
с как Fold
или же FoldM
от foldl
библиотека, а затем объединить их, используя Applicative
стиль. Затем вы можете преобразовать этот комбинированный сгиб в тот, который работает на трубах.
Давайте предположим, что у вас есть два Fold
s:
fold1 :: Fold a r1
fold2 :: Fold a r2
... или два FoldM
s:
foldM1 :: Monad m => FoldM a m r1
foldM2 :: Monad m => FoldM a m r2
Затем вы объединяете их в один Fold
/FoldM
с помощью Applicative
стиль:
import Control.Applicative
foldBoth :: Fold a (r1, r2)
foldBoth = (,) <$> fold1 <*> fold2
foldBothM :: Monad m => FoldM a m (r1, r2)
foldBothM = (,) <$> foldM1 <*> foldM2
-- or: foldBoth = liftA2 (,) fold1 fold2
-- foldMBoth = liftA2 (,) foldM1 foldM2
Вы можете превратить любой фолд в Pipes.Prelude
сгиб стиля или Parser
, Вот необходимые функции преобразования:
import Control.Foldl (purely, impurely)
import qualified Pipes.Prelude as Pipes
import qualified Pipes.Parse as Parse
purely Pipes.fold
:: Monad m => Fold a b -> Producer a m () -> m b
impurely Pipes.foldM
:: Monad m => FoldM m a b -> Producer a m () -> m b
purely Parse.foldAll
:: Monad m => Fold a b -> Parser a m r
impurely Parse.foldMAll
:: Monad m => FoldM a m b -> Parser a m r
Причина для purely
а также impurely
функции так, что foldl
а также pipes
может взаимодействовать друг с другом без зависимости от другого. Кроме того, они позволяют библиотекам, кроме pipes
(лайк conduit
) повторно использовать foldl
без зависимости тоже (Подсказка, @MichaelSnoyman).
Я извиняюсь, что эта функция не задокументирована, в основном потому, что мне понадобилось время, чтобы понять, как ее получить. pipes
а также foldl
взаимодействовать без зависимости, и это было после того, как я написал pipes
руководство. Я обновлю учебник, чтобы указать на этот трюк.
Чтобы научиться использовать foldl
Просто прочитайте документацию в основном модуле. Это очень маленькая и простая в освоении библиотека.
Для чего бы то ни было, в мире каналов, соответствующая функция - это zipSinks. Может быть какой-то способ адаптировать эту функцию для работы с каналами, но может помешать автоматическое завершение.
Потребитель формирует монаду так
combineConsumers = liftM2 (,)
Тип проверки. К сожалению, семантика может отличаться от того, что вы ожидаете: первый потребитель будет работать до конца, а затем второй.