Обработка потоков событий в haskell

Я хочу обработать поток событий, полученных через MQTT. Библиотека, которую я использую, использует обратный вызов для предоставления результатов. Обработка, которую я делаю, зависит от предыдущего состояния, а не только от последнего события. Также в будущем события могут быть собраны из других источников.

Сначала я решил составить его в список, который звучит как хорошая идея. У меня была небольшая проблема, потому что IO предотвращает ленивую оценку, и ожидание бесконечного потока может быть долгим, но я решил это с чередованием IO.

stream :: IO [Event] позволяет мне делать хорошие вещи, такие как foldl, foldMmap, mapMи т. д. К сожалению, при таком подходе я не смогу объединить два потока, потому что там больше нет функции блокировки.

Я копался во многих библиотеках и нашел STM с TQueue, например. К сожалению, это не то, что я хочу.

Я решил создать собственный тип и сделать его Foldable так что я смогу сложить. Я потерпел неудачу из-за IO.

import Control.Concurrent.STM

newtype Stream a = Stream (STM a)

runStream
  :: ((a -> IO ()) -> IO i)
  -> IO (Stream a)
runStream block = do
  queue <- newTQueueIO
  block (atomically . writeTQueue queue)
  return $ Stream (readTQueue queue)

foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
  n <- atomically read
  m <- f n s
  foldStream f m (Stream read)

mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read

zipStream :: [Stream a] -> Stream a
zipStream = undefined

Который может быть использован как main = foldStream (\x _ -> print x) () =<< events

Можно ли реализовать base некоторых базовых классов для работы с этим потоком, как с обычным List?

1 ответ

Обычный трюк в этих случаях - заставить обратный вызов записать в очередь, а затем прочитать с другого конца очереди.

Используя ограниченную закрываемую очередь из пакета stm-chans, мы можем определить эту функцию:

import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue

foldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b
foldQueue queue step start done =
    let go state = 
            do m <- atomically (readTBMQueue queue)
               case m of 
                   Nothing -> done state
                   Just a  -> step state a >>= go
     in start >>= go

Требуется канал, шаговая функция (аналогичная той, которая требуется для foldM), действие для получения начального состояния и действие "выполнено", которое возвращает конечный результат и затем передает данные из канала, пока он не будет закрыт. Обратите внимание, что состояние сгиба x выбирается абонентом foldQueue,

Если позже мы захотим обновить монадические сгибы из пакета foldl, что очень полезно Applicative экземпляр - мы можем сделать это так:

import qualified Control.Foldl as L

foldQueue' :: TBMQueue a -> L.FoldM IO a b -> IO b 
foldQueue' queue = L.impurely (foldQueue queue)

С помощью impurely из пакета "foldl".

Иногда (например, при разборе, группировании или декодировании) проще использовать потребителя на основе запросов. Мы можем сделать это с помощью потокового пакета:

import Streaming
import qualified Streaming.Prelude as S

foldQueue' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r
foldQueue' queue consume = consume (S.untilRight (do
    m <- atomically (readTBMQueue queue)
    return (case m of
        Nothing -> Right ()
        Just a -> Left a)))

Учитывая функцию, которая потребляет поток, мы передаем ей поток значений, считанных из очереди.

Часто чтение с канала и запись в него должны происходить в разных потоках. Мы можем использовать такие функции, как concurrently от асинхронных, чтобы справиться с этим чисто.

Другие вопросы по тегам