Как использовать абонента ZMQ в Haskell с трубами
Я могу заставить подписчика ZMQ работать в Haskell, но был бы признателен за рекомендации о том, как использовать эти данные с Pipes. Моя попытка написать Producer не удалась при "сборке стека" со следующей ошибкой:
Не удалось сопоставить тип "Proxy X () c'0 c0 (ZMQ z)" с "ZMQ z"
Ожидаемый тип: ZMQ z ()
Фактический тип: Proxy X () c'0 c0 (ZMQ z) ()
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Monad
import Pipes
import qualified Pipes.Prelude as P
import System.ZMQ4.Monadic
import qualified Data.ByteString.Char8 as CS
fromZMQ :: (Receiver r) => Socket z r -> Producer String (ZMQ z) ()
fromZMQ sock = do
msg <- lift $ receive sock
yield (CS.unpack msg)
fromZMQ sock
main :: IO ()
main = --do
runZMQ $ do
subSock <- socket Sub ---subscriptionSocket
subscribe subSock ""
connect subSock "tcp://127.0.0.1:4998"
forever $ fromZMQ subSock >-> P.take 3 >-> P.print
Обратите внимание, я хочу использовать данные, публикуемые в ZMQ скриптом Python.
1 ответ
Единственная проблема в вашем коде - последняя строка.
У вас там есть "труба":
fromZMQ subSock >-> P.take 3 >-> P.print :: Effect (ZMQ z) ()
и когда вы подаете заявку forever
тип остается неизменным. Но то, что вам нужно, это просто ZMQ z
другими словами, вам нужно на самом деле выполнить pipes
вычисление и вы используете runEffect
Функция для этого. Как примечание, вам на самом деле не нужно forever
поскольку поток никогда не закончится в любом случае.
Итак, все, что вам нужно сделать, это заменить forever
с runEffect
в последней строке.
Отработав предложение Томаса об использовании Чана, я адаптировал пример MVar (ссылка ниже) для накопления полученных строк и их количества для демонстрации состояния обновления и чтения.
https://www.oreilly.com/library/view/parallel-and-concurrent/9781449335939/ch07.html
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Concurrent
import Control.Monad
import System.ZMQ4.Monadic
import qualified Data.ByteString.Char8 as CS
newtype State = State (MVar (Int, [CS.ByteString]) ) --(count, list of strings received over zmq)
newState :: IO State
newState = do
m <- newMVar (0, [])
return (State m)
updateState :: State -> CS.ByteString -> IO ()
updateState (State m) newString = do
(count,strList) <- takeMVar m
putMVar m ( count + 1 , strList ++ [newString] )
showState :: State -> IO String
showState (State m) = do
count <- takeMVar m
putMVar m count --return the lock; no changes
return (show count)
main = runZMQ $ do
sub <- socket Sub
subscribe sub ""
connect sub "tcp://127.0.0.1:4998"
s <- liftIO newState
forever $ do
receive sub >>= liftIO . updateState s
liftIO $ updateState s "hello" --'manually' add an additional string on each iteration
op <- liftIO $ showState s
liftIO $ print op