Сделать два вызова функций атомарными?

У меня есть следующее использование очереди блоков https://msdn.microsoft.com/en-us/library/vstudio/hh297096(v=vs.100).aspx. Как сделать так, чтобы некоторые из выполняемых функций были атомарными?

Ниже приведен тестируемый пример.

// ----------------------------------------------------------------------------
// Blocking queue agent
// ----------------------------------------------------------------------------

open System
open System.Collections.Generic

type Agent<'T> = MailboxProcessor<'T>

type internal BlockingAgentMessage<'T> = 
  | Add of 'T * AsyncReplyChannel<unit> 
  | Get of AsyncReplyChannel<'T>

/// Agent that implements an asynchronous blocking queue
type BlockingQueueAgent<'T>(maxLength) =
  let agent = Agent.Start(fun agent ->

    let queue = new Queue<_>()

    let rec emptyQueue() = 
      agent.Scan(fun msg ->
        match msg with 
        | Add(value, reply) -> Some(enqueueAndContinue(value, reply))
        | _ -> None )
    and fullQueue() = 
      agent.Scan(fun msg ->
        match msg with 
        | Get(reply) -> Some(dequeueAndContinue(reply))
        | _ -> None )
    and runningQueue() = async {
      let! msg = agent.Receive()
      match msg with 
      | Add(value, reply) -> return! enqueueAndContinue(value, reply)
      | Get(reply) -> return! dequeueAndContinue(reply) }

    and enqueueAndContinue (value, reply) = async {
      reply.Reply() 
      queue.Enqueue(value)
      return! chooseState() }
    and dequeueAndContinue (reply) = async {
      reply.Reply(queue.Dequeue())
      return! chooseState() }
    and chooseState() = 
      if queue.Count = 0 then emptyQueue()
      elif queue.Count < maxLength then runningQueue()
      else fullQueue()

    // Start with an empty queue
    emptyQueue() )

  /// Asynchronously adds item to the queue. The operation ends when
  /// there is a place for the item. If the queue is full, the operation
  /// will block until some items are removed.
  member x.AsyncAdd(v:'T, ?timeout) = 
    agent.PostAndAsyncReply((fun ch -> Add(v, ch)), ?timeout=timeout)

  /// Asynchronously gets item from the queue. If there are no items
  /// in the queue, the operation will block unitl items are added.
  member x.AsyncGet(?timeout) = 
    agent.PostAndAsyncReply(Get, ?timeout=timeout)


// ----------------------------------------------------------------------------
open System.Threading

let ag = new BlockingQueueAgent<int>(3)

async { 
  for i in 0 .. 10 do 

    // The following 5 lines need to be atomic 
    printfn "1. before add %d" i
    Thread.Sleep(1000)
    printfn "2. before add %d" i
    Thread.Sleep(1000)
    printfn "3. before add %d" i

    do! ag.AsyncAdd(i)
    printfn "Added %d" i }
|> Async.Start

async { 
  while true do
    do! Async.Sleep(1000)
    let! v = ag.AsyncGet()

    // The following 5 lines need to be atomic
    printfn "1. process %d" v
    Thread.Sleep(1000)
    printfn "2. process  %d" v
    Thread.Sleep(1000)
    printfn "3. process  %d" v

    printfn "Got %d" v }
|> Async.Start

1 ответ

Вы могли бы использовать Mutex.

В.NET вы можете использовать класс Mutex, если вам нужна межпроцессная блокировка.

Если вам нужно синхронизировать только внутри домена приложений, возможно, достаточно блокировки..?

Другие вопросы по тегам