Объединить каналы в один

Я ищу функцию, которая может сделать что-то похожее на:

merge :: MonadIO m => [Producer m a] -> Producer m a

Я быстро взглянул на stm-conduitэто выглядит похоже, но я не уверен, что это соответствует моим требованиям:

messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
  where
    loop = do
      mmsg <- liftIO $ getMsg chan ack q
      case mmsg of
        Just (m, e) -> do
          yield (m, e)
          liftIO $ ackMsg chan (envDeliveryTag e) False
          loop
        Nothing     -> loop
    chan = fst $ amqpChan conn

Как вы можете видеть, этот производитель канала получает сообщение после его выдачи. В простом "однопоточном" конвейере он работает хорошо, сообщение попадает в приемник и затем фиксируется.

Однако с stm-conduit это может измениться, потому что, насколько я понимаю, производитель не будет ждать, пока сообщение будет получено приемником, вместо этого они будут работать параллельно, и сообщение может быть преждевременно подтверждено.

Мое понимание stm-conduit правильный?
И как можно объединить отдельные источники в один, чтобы получить приятную однопотоковую семантику?

ОБНОВЛЕНИЕ: Обновлен код для реального рабочего примера AMQP по запросу (однако он может быть немного шумнее).

ОБНОВЛЕНИЕ 2: я думаю, что то, что я после, могло быть Альтернативным экземпляром для источников канала, таким образом я мог сделать что-то вроде let src = src1 <|> src2, Возможно ли это как-то?

2 ответа

Решение

mergeSources в stm-conduit поддерживает TBMChannel за кулисами. Все ваши источники / производители сначала подключены к TBMChannelЗатем он создаст один источник, который попытается извлечь значения из канала FIFO.

Вы можете установить границы промежуточного TBMChannel когда используешь mergeSources, Допустим, вы установили границу n, тогда первые n значений, полученных всеми источниками, будут сброшены в TBMChannel и AmqpConn сразу, при условии, что он не заблокирован на AmqpConn конец, и ваш потребитель медленнее, чем источники (кстати AmqpConn использует неограниченный Control.Concurrent.Chan так что не будет блокировать). После этого TBMChannel переполняется, поэтому источники, пытающиеся передать значение каналу, больше не блокируются. Ваш потребитель получает значение один за другим из объединенного источника, поэтому оно последовательно после первых n элементов.

Чтобы убедиться, что он последовательный с самого начала, вы можете установить ограничение на 1, однако это может вызвать некоторые проблемы с производительностью.

Посмотри на ZipSource, который является оболочкой нового типа, чья Applicative позволяет объединить Source так, как вы хотите.

Когда у вас есть ZipSource, ты можешь использовать zipSources объединить Source в Traversable (например, список) в Source из Traversable s.

Единственное отличие от желаемого типа результата в том, что это Source через Traversable ценностей, а не просто одно значение, но это не должно быть большой проблемой.

Другие вопросы по тегам