TMVar, но без буфера?
Я пытаюсь сделать общение между легкими нитями Haskell. Потоки хотят отправлять друг другу сообщения для общения и синхронизации.
Я изначально использовал TMVar
для этого, но я только что понял, что семантика неверна: TMVar
будет хранить одно сообщение внутри себя, поэтому размещение сообщения в пустом TMVar
не будет блокировать Он будет заблокирован только если вы отправите сообщение на полный TMVar
,
Может кто-нибудь предложить подобное STM
Конструкция IPC, которая:
- блокирует все записи, пока сообщение не будет использовано;
- будет блокировать все чтения, пока не будет предоставлено сообщение?
то есть труба нулевой длины была бы идеальной; но я не думаю BoundedChan
был бы счастлив, если бы я дал ему емкость 0. (Кроме того, это не STM
.)
3 ответа
Если я правильно понимаю вашу проблему, я не думаю, что вы можете, так как транзакционные гарантии означают, что транзакция A не может читать из записи транзакции B, пока транзакция B не будет зафиксирована, и в этот момент она больше не может блокироваться.
TMVar
является ближайшим к вам, если вы используете STM. Используя IO, вы можете построить структуру, которая завершает запись только тогда, когда читатель доступен (эта структура уже может существовать, но я не знаю об этом).
Я бы предложил переформулировать два требования:
- блокирует все записи, пока сообщение не будет использовано;
- будет блокировать все чтения, пока не будет предоставлено сообщение.
Проблема в терминах блока и потребляется / предоставляется. С STM нет понятия блока, есть только retry
, который имеет другую семантику: он перезапускает текущую транзакцию - он не ждет, пока что-то произойдет (это может вызвать взаимоблокировки). Поэтому мы не можем сказать "блокировать до...", мы можем только сказать что-то вроде "транзакция завершится успешно только тогда, когда...".
Точно так же, что означает "пока сообщение не будет использовано / предоставлено"? Поскольку транзакции являются атомарными, это может быть только "до тех пор, пока транзакция, которая потребила / предоставила сообщение, не прошла успешно".
Итак, давайте попробуем переформулировать:
- заставит все записи повторить попытку, пока транзакция, которая потребляет сообщение, не будет успешной;
- все попытки чтения будут повторяться до тех пор, пока транзакция, предоставляющая сообщение, не будет выполнена успешно.
Но теперь первый пункт не имеет смысла: если попытка записи повторяется, сообщение не будет использовано, транзакция не была приостановлена, она была отброшена и запущена заново - возможно, появилось другое сообщение!
Другими словами: любые данные могут покинуть транзакцию STM, только когда она завершится успешно (завершится). Это сделано специально - транзакции всегда являются атомарными с точки зрения внешнего мира / других транзакций - вы никогда не сможете наблюдать результаты только части транзакции. Вы никогда не сможете наблюдать, как две транзакции взаимодействуют.
Таким образом, очередь с нулевой длиной - плохая аналогия - она никогда не позволит передавать какие-либо данные. В конце любой транзакции она должна быть пустой, чтобы никакие данные никогда не проходили.
Тем не менее, я верю, что можно будет переформулировать требования в соответствии с вашими целями и впоследствии найти решение.
Вы говорите, что были бы счастливы, если одна сторона или другая IO
скорее, чем STM
, Так что это не так уж сложно кодировать. Давайте начнем с версии, которая имеет получение в IO
, Чтобы это произошло, получатель должен инициировать рукопожатие.
type SynchronousVar a = TChan (TMVar a)
send :: SynchronousVar a -> a -> STM a
receive :: SynchronousVar a -> IO a
send svar a = do
tmvar <- readTChan svar
putTMVar tmvar a
receive svar = do
tmvar <- newEmptyTMVarIO
atomically $ writeTChan svar tmvar
atomically $ takeTMVar tmvar
Аналогичный протокол может быть написан с отправкой начала рукопожатия.
type SynchronousVar a = TChan (a, TMVar ())
send :: SynchronousVar a -> a -> IO a
receive :: SynchronousVar a -> STM a
send svar a = do
tmvar <- newEmptyTMVarIO
atomically $ writeTChan svar (a, tmvar)
atomically $ takeTMVar tmvar
receive svar = do
(a, tmvar) <- readTChan svar
putTMvar tmvar ()
return a
Возможно, если вам действительно нужна синхронная связь, это потому, что вы хотите двустороннюю связь (то есть действие, которое выполняется в IO
хочет что-то узнать о нити, с которой он синхронизируется). Нетрудно расширить вышеприведенный протокол, чтобы выдать немного больше информации о синхронизации (добавив ее в одн кортеж в предыдущем случае или в TMVar
в последнем случае).