TMVar, но без буфера?

Я пытаюсь сделать общение между легкими нитями Haskell. Потоки хотят отправлять друг другу сообщения для общения и синхронизации.

Я изначально использовал TMVar для этого, но я только что понял, что семантика неверна: TMVar будет хранить одно сообщение внутри себя, поэтому размещение сообщения в пустом TMVar не будет блокировать Он будет заблокирован только если вы отправите сообщение на полный TMVar,

Может кто-нибудь предложить подобное STM Конструкция IPC, которая:

  • блокирует все записи, пока сообщение не будет использовано;
  • будет блокировать все чтения, пока не будет предоставлено сообщение?

то есть труба нулевой длины была бы идеальной; но я не думаю BoundedChan был бы счастлив, если бы я дал ему емкость 0. (Кроме того, это не STM.)

3 ответа

Если я правильно понимаю вашу проблему, я не думаю, что вы можете, так как транзакционные гарантии означают, что транзакция A не может читать из записи транзакции B, пока транзакция B не будет зафиксирована, и в этот момент она больше не может блокироваться.

TMVar является ближайшим к вам, если вы используете STM. Используя IO, вы можете построить структуру, которая завершает запись только тогда, когда читатель доступен (эта структура уже может существовать, но я не знаю об этом).

Я бы предложил переформулировать два требования:

  • блокирует все записи, пока сообщение не будет использовано;
  • будет блокировать все чтения, пока не будет предоставлено сообщение.

Проблема в терминах блока и потребляется / предоставляется. С STM нет понятия блока, есть только retry, который имеет другую семантику: он перезапускает текущую транзакцию - он не ждет, пока что-то произойдет (это может вызвать взаимоблокировки). Поэтому мы не можем сказать "блокировать до...", мы можем только сказать что-то вроде "транзакция завершится успешно только тогда, когда...".

Точно так же, что означает "пока сообщение не будет использовано / предоставлено"? Поскольку транзакции являются атомарными, это может быть только "до тех пор, пока транзакция, которая потребила / предоставила сообщение, не прошла успешно".

Итак, давайте попробуем переформулировать:

  • заставит все записи повторить попытку, пока транзакция, которая потребляет сообщение, не будет успешной;
  • все попытки чтения будут повторяться до тех пор, пока транзакция, предоставляющая сообщение, не будет выполнена успешно.

Но теперь первый пункт не имеет смысла: если попытка записи повторяется, сообщение не будет использовано, транзакция не была приостановлена, она была отброшена и запущена заново - возможно, появилось другое сообщение!

Другими словами: любые данные могут покинуть транзакцию STM, только когда она завершится успешно (завершится). Это сделано специально - транзакции всегда являются атомарными с точки зрения внешнего мира / других транзакций - вы никогда не сможете наблюдать результаты только части транзакции. Вы никогда не сможете наблюдать, как две транзакции взаимодействуют.

Таким образом, очередь с нулевой длиной - плохая аналогия - она ​​никогда не позволит передавать какие-либо данные. В конце любой транзакции она должна быть пустой, чтобы никакие данные никогда не проходили.

Тем не менее, я верю, что можно будет переформулировать требования в соответствии с вашими целями и впоследствии найти решение.

Вы говорите, что были бы счастливы, если одна сторона или другая IO скорее, чем STM, Так что это не так уж сложно кодировать. Давайте начнем с версии, которая имеет получение в IO, Чтобы это произошло, получатель должен инициировать рукопожатие.

type SynchronousVar a = TChan (TMVar a)

send :: SynchronousVar a -> a -> STM a
receive :: SynchronousVar a -> IO a

send svar a = do
    tmvar <- readTChan svar
    putTMVar tmvar a

receive svar = do
    tmvar <- newEmptyTMVarIO
    atomically $ writeTChan svar tmvar
    atomically $ takeTMVar tmvar

Аналогичный протокол может быть написан с отправкой начала рукопожатия.

type SynchronousVar a = TChan (a, TMVar ())

send :: SynchronousVar a -> a -> IO a
receive :: SynchronousVar a -> STM a

send svar a = do
    tmvar <- newEmptyTMVarIO
    atomically $ writeTChan svar (a, tmvar)
    atomically $ takeTMVar tmvar

receive svar = do
    (a, tmvar) <- readTChan svar
    putTMvar tmvar ()
    return a

Возможно, если вам действительно нужна синхронная связь, это потому, что вы хотите двустороннюю связь (то есть действие, которое выполняется в IO хочет что-то узнать о нити, с которой он синхронизируется). Нетрудно расширить вышеприведенный протокол, чтобы выдать немного больше информации о синхронизации (добавив ее в одн кортеж в предыдущем случае или в TMVar в последнем случае).

Другие вопросы по тегам