СТМ с частичной атомарностью для некоторых телевизоров

Я делаю вещи с STM и использовал среди прочего TBQueue структура данных с большим успехом. Полезная функция, для которой я ее использую, включает чтение из нее на основе предварительного условия в TVar в принципе так:

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readTBQueue queue
    doSomethingWith a
  else doSomethingElse

Если мы предположим, что queue пусто и shouldReadVar содержит True перед выполнением этого блока, это приведет к readTBQueue призвание retry и блок будет повторно выполнен, когда shouldReadVar содержит False или же queue содержит элемент, что бы ни случилось первым.


Теперь мне нужна синхронная структура данных канала, похожая на структуру, описанную в этой статье (пожалуйста, прочитайте ее, если вы хотите понять этот вопрос), за исключением того, что она должна быть читаемой с предварительным условием, как в предыдущем примере, и, возможно, сочинять с другими вещами, а также.

Давайте назовем эту структуру данных SyncChan с writeSyncChan а также readSyncChan операции, определенные на нем.

И вот возможный вариант использования: этот (псевдо) код (который не будет работать, потому что я смешиваю концепции STM/IO):

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readSyncChan syncChan
    doSomethingWith a
  else doSomethingElse

Предполагая, что никакой другой поток в настоящее время не блокирует writeSyncChan позвони и shouldReadChan содержит True Я хочу, чтобы блок " retry "пока либо shouldReadChan содержит False или другой поток блоков на writeSyncChan, Другими словами: когда одна нить retry на writeSyncChan и другой поток блоков достигает readSyncChan или наоборот, я хочу, чтобы значение передавалось по каналу. Во всех остальных случаях обе стороны должны быть в retry состояние и, следовательно, реагировать на изменение shouldReadVar, так что чтение или запись могут быть отменены.

Наивный подход, описанный в статье, связанной выше, с использованием двух (T) MVar Конечно, это невозможно. Поскольку структура данных является синхронной, ее невозможно использовать в течение двух atomically блоки, потому что вы не можете изменить один TMVar и ждать другого TMVar быть измененным в атомном контексте.

Вместо этого я ищу вид частичной атомарности, где я могу "зафиксировать" определенную часть транзакции и откатить ее только при изменении определенных переменных, но не других. Если у меня есть переменные "msg" и "ack", как в первом примере в приведенной выше статье, я хочу иметь возможность записать в переменную "msg", а затем дождаться, пока значение не придет в "ack", или пока другие транзакционные переменные для изменения. Если другие транзакционные переменные изменяются, весь атомарный блок должен быть повторен, и если приходит "ack" значение, транзакция должна продолжиться, как это было в предыдущем состоянии. Что касается чтения, должно произойти нечто подобное, за исключением того, что я, конечно, буду читать из "msg" и писать в "ack".

Это возможно сделать с помощью GHC STM, или мне нужно сделать ручную обработку MVar/ откат?

1 ответ

Это то, что вы хотите:

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad

data SyncChan a = SyncChan (TMVar a) (TMVar ())

newSyncChan :: IO (SyncChan a)
newSyncChan = do
    msg <- newEmptyTMVarIO
    ack <- newEmptyTMVarIO
    return (SyncChan msg ack)

readIf :: SyncChan a -> TVar Bool -> STM (Maybe a)
readIf (SyncChan msg ack) shouldReadVar = do
    b <- readTVar shouldReadVar
    if b
        then do
            a <- takeTMVar msg
            putTMVar ack ()
            return (Just a)
        else return Nothing

write :: SyncChan a -> a -> IO ()
write (SyncChan msg ack) a = do
    atomically $ putTMVar msg a
    atomically $ takeTMVar ack

main = do
    sc <- newSyncChan
    tv <- newTVarIO True
    forkIO $ forever $ forM_ [False, True] $ \b -> do
        threadDelay 2000000
        atomically $ writeTVar tv b
    forkIO $ forM_ [0..] $ \i -> do
        putStrLn "Writing..."
        write sc i
        putStrLn "Write Complete"
        threadDelay 300000
    forever $ do
        putStrLn "Reading..."
        a <- atomically $ readIf sc tv
        print a
        putStrLn "Read Complete"

Это дает поведение, которое вы имели в виду. В то время как TVar является True концы входа и выхода будут синхронизированы друг с другом. Когда TVar переключается на False тогда конец чтения свободно прерывается и возвращается Nothing,

Другие вопросы по тегам