Разделяемое изменяемое состояние: когда использовать IORef

У меня есть основной поток, который пишет на карту и PSQ. И в Map, и в PSQ я использую одни и те же ключи, поэтому, глядя на PSQ, можно найти запись с минимальным приоритетом со сложностью O(1) и сопоставить ее значению на карте.

Теперь, пока мой основной поток добавляет / изменяет карту и PSQ, когда это необходимо, у меня есть второй поток, который постоянно (forever $ do) смотрит на PSQ, чтобы определить, когда самый старый ключ - N мс назад, а затем должен его сбросить.

Чтобы это произошло, оба потока должны просматривать одни и те же изменяемые данные. Каков наилучший способ сохранить состояние? Будет ли это случай для IOREfs? Какие еще были бы возможные способы решить эту проблему?

"Какой-то" пре-альфа код здесь:

import Data.Time
import Data.Functor
import Data.Time.Clock.POSIX
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B 

--PSQ = (host, PID) POSIXTime
--where the tuple is k and POSIXTime is p

--Map is (host, PortNumber) [messages]
--where the tuple is the key and [messages] is a list of messages

key = ("192.168.1.1", 4711)
messages = ["aaa", "bbbb", "ccccc"]

newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
newRq = do
      time <- getPOSIXTime
      let q = PSQ.singleton key time
      let m = Map.singleton key messages
      return (q, m)

appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String]
appendMsg newmsgs (host, port) m =
      let Just messages' = Map.lookup (host,port) m
          l = length . concat $ messages'
          l' = l + length newmsgs
      in 
      if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m

insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
insertNewRec (a,b) c q m = do
      time <- getPOSIXTime
      let q1 = PSQ.insert (a,b) time q
      let m1 = Map.insert (a,b) c m
      return (q1, m1)

sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO ()
sendq s datastring host port = do
      hostAddr <- inet_addr host
      sendAllTo s datastring (SockAddrInet port hostAddr)
      return ()

deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
deleteRec (host, port) q m = (q', m')
      where 
         m' = Map.delete (host, port) m
         q' = PSQ.delete (host, port) q

loopMyQ q m1 done = forever $ do 
      let Just m = PSQ.findMin q
      let time = (PSQ.prio m) + 0.200 --adds 200ms
      now <- getPOSIXTime
      if now < time
        then print (m1) 
        --here eventually I would call the send function to flush the queue
        else putMVar done ()

sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String])) 
sendrecv s q1 m1 msg = do
     let m2 = appendMsg msg key m1
         (q3, m3) = case m2 of   
                   val | m2 == m1 -> deleteRec key q1 m1
                       | otherwise -> (q1, m2)
     (q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3
                                        return (q4, m4)) else return (q1, m2)
     when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711)
     return (q5, m5)

--main :: IO()
main = withSocketsDo $ do
     s <- socket AF_INET Datagram defaultProtocol
     (q1, m1) <- newRq
     done <- newEmptyMVar
     forkIO $ loopMyQ q1 m1 done
     (q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000]
     takeMVar done
     --print ("longer than 200ms ago")

1 ответ

Решение

Скорее всего, вы хотите использовать MVars или TVars для поддержания согласованного состояния между потоками. IORefне являются потокобезопасными.

Я рекомендую использовать STM (и TVars) для этой проблемы. Вы имеете дело с одновременным доступом к нескольким структурам данных, и с компоновкой STM гораздо проще справиться, чем думать о порядке блокировки с MVars.

После просмотра вашего кода кажется, что TVars будет вашим лучшим выбором. Оберните свой PSQ и карту в два разных телевизора. Оберните весь код, который требует согласованного представления как в atomically сделка. В большинстве случаев ваш код будет "просто работать". Однако, если есть конфликт за блокировку, атомарный блок будет просто повторен, пока он не заработает.

Другие вопросы по тегам