Обработка потоков событий в haskell
Я хочу обработать поток событий, полученных через MQTT. Библиотека, которую я использую, использует обратный вызов для предоставления результатов. Обработка, которую я делаю, зависит от предыдущего состояния, а не только от последнего события. Также в будущем события могут быть собраны из других источников.
Сначала я решил составить его в список, который звучит как хорошая идея. У меня была небольшая проблема, потому что IO предотвращает ленивую оценку, и ожидание бесконечного потока может быть долгим, но я решил это с чередованием IO.
stream :: IO [Event]
позволяет мне делать хорошие вещи, такие как foldl
, foldM
map
, mapM
и т. д. К сожалению, при таком подходе я не смогу объединить два потока, потому что там больше нет функции блокировки.
Я копался во многих библиотеках и нашел STM с TQueue, например. К сожалению, это не то, что я хочу.
Я решил создать собственный тип и сделать его Foldable
так что я смогу сложить. Я потерпел неудачу из-за IO.
import Control.Concurrent.STM
newtype Stream a = Stream (STM a)
runStream
:: ((a -> IO ()) -> IO i)
-> IO (Stream a)
runStream block = do
queue <- newTQueueIO
block (atomically . writeTQueue queue)
return $ Stream (readTQueue queue)
foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
n <- atomically read
m <- f n s
foldStream f m (Stream read)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read
zipStream :: [Stream a] -> Stream a
zipStream = undefined
Который может быть использован как main = foldStream (\x _ -> print x) () =<< events
Можно ли реализовать base некоторых базовых классов для работы с этим потоком, как с обычным List?
1 ответ
Обычный трюк в этих случаях - заставить обратный вызов записать в очередь, а затем прочитать с другого конца очереди.
Используя ограниченную закрываемую очередь из пакета stm-chans, мы можем определить эту функцию:
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
foldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b
foldQueue queue step start done =
let go state =
do m <- atomically (readTBMQueue queue)
case m of
Nothing -> done state
Just a -> step state a >>= go
in start >>= go
Требуется канал, шаговая функция (аналогичная той, которая требуется для foldM
), действие для получения начального состояния и действие "выполнено", которое возвращает конечный результат и затем передает данные из канала, пока он не будет закрыт. Обратите внимание, что состояние сгиба x
выбирается абонентом foldQueue
,
Если позже мы захотим обновить монадические сгибы из пакета foldl, что очень полезно Applicative
экземпляр - мы можем сделать это так:
import qualified Control.Foldl as L
foldQueue' :: TBMQueue a -> L.FoldM IO a b -> IO b
foldQueue' queue = L.impurely (foldQueue queue)
С помощью impurely
из пакета "foldl".
Иногда (например, при разборе, группировании или декодировании) проще использовать потребителя на основе запросов. Мы можем сделать это с помощью потокового пакета:
import Streaming
import qualified Streaming.Prelude as S
foldQueue' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r
foldQueue' queue consume = consume (S.untilRight (do
m <- atomically (readTBMQueue queue)
return (case m of
Nothing -> Right ()
Just a -> Left a)))
Учитывая функцию, которая потребляет поток, мы передаем ей поток значений, считанных из очереди.
Часто чтение с канала и запись в него должны происходить в разных потоках. Мы можем использовать такие функции, как concurrently
от асинхронных, чтобы справиться с этим чисто.