Объединить каналы в один
Я ищу функцию, которая может сделать что-то похожее на:
merge :: MonadIO m => [Producer m a] -> Producer m a
Я быстро взглянул на stm-conduit
это выглядит похоже, но я не уверен, что это соответствует моим требованиям:
messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
where
loop = do
mmsg <- liftIO $ getMsg chan ack q
case mmsg of
Just (m, e) -> do
yield (m, e)
liftIO $ ackMsg chan (envDeliveryTag e) False
loop
Nothing -> loop
chan = fst $ amqpChan conn
Как вы можете видеть, этот производитель канала получает сообщение после его выдачи. В простом "однопоточном" конвейере он работает хорошо, сообщение попадает в приемник и затем фиксируется.
Однако с stm-conduit
это может измениться, потому что, насколько я понимаю, производитель не будет ждать, пока сообщение будет получено приемником, вместо этого они будут работать параллельно, и сообщение может быть преждевременно подтверждено.
Мое понимание stm-conduit
правильный?
И как можно объединить отдельные источники в один, чтобы получить приятную однопотоковую семантику?
ОБНОВЛЕНИЕ: Обновлен код для реального рабочего примера AMQP по запросу (однако он может быть немного шумнее).
ОБНОВЛЕНИЕ 2: я думаю, что то, что я после, могло быть Альтернативным экземпляром для источников канала, таким образом я мог сделать что-то вроде let src = src1 <|> src2
, Возможно ли это как-то?
2 ответа
mergeSources
в stm-conduit
поддерживает TBMChannel
за кулисами. Все ваши источники / производители сначала подключены к TBMChannel
Затем он создаст один источник, который попытается извлечь значения из канала FIFO.
Вы можете установить границы промежуточного TBMChannel
когда используешь mergeSources
, Допустим, вы установили границу n, тогда первые n значений, полученных всеми источниками, будут сброшены в TBMChannel
и AmqpConn
сразу, при условии, что он не заблокирован на AmqpConn
конец, и ваш потребитель медленнее, чем источники (кстати AmqpConn
использует неограниченный Control.Concurrent.Chan
так что не будет блокировать). После этого TBMChannel переполняется, поэтому источники, пытающиеся передать значение каналу, больше не блокируются. Ваш потребитель получает значение один за другим из объединенного источника, поэтому оно последовательно после первых n элементов.
Чтобы убедиться, что он последовательный с самого начала, вы можете установить ограничение на 1, однако это может вызвать некоторые проблемы с производительностью.
Посмотри на ZipSource
, который является оболочкой нового типа, чья Applicative
позволяет объединить Source
так, как вы хотите.
Когда у вас есть ZipSource
, ты можешь использовать zipSources
объединить Source
в Traversable
(например, список) в Source
из Traversable
s.
Единственное отличие от желаемого типа результата в том, что это Source
через Traversable
ценностей, а не просто одно значение, но это не должно быть большой проблемой.