Использовать очередь и семафор для параллелизма и оболочки свойств?

Я пытаюсь создать потокобезопасную оболочку свойств. Я мог думать только об очередях и семафорах GCD как о самом быстром и надежном способе. Является ли семафор более производительным (если это правда), или есть еще одна причина использовать один вместо другого для параллелизма?

Ниже приведены два варианта оберток атомарных свойств:

@propertyWrapper
struct Atomic<Value> {
    private var value: Value
    private let queue = DispatchQueue(label: "Atomic serial queue")

    var wrappedValue: Value {
        get { queue.sync { value } }
        set { queue.sync { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

@propertyWrapper
struct Atomic2<Value> {
    private var value: Value
    private var semaphore = DispatchSemaphore(value: 1)

    var wrappedValue: Value {
        get {
            semaphore.wait()
            let temp = value
            semaphore.signal()
            return temp
        }

        set {
            semaphore.wait()
            value = newValue
            semaphore.signal()
        }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

struct MyStruct {
    @Atomic var counter = 0
    @Atomic2 var counter2 = 0
}

func test() {
    var myStruct = MyStruct()

    DispatchQueue.concurrentPerform(iterations: 1000) {
        myStruct.counter += $0
        myStruct.counter2 += $0
   }
}

Как их можно правильно протестировать и измерить, чтобы увидеть разницу между двумя реализациями и работают ли они вообще?

1 ответ

Решение

FWIW, другой вариант - шаблон читатель-писатель с параллельной очередью, где чтения выполняются синхронно, но им разрешено выполняться одновременно по отношению к другим чтениям, но записи выполняются асинхронно, но с барьером (т. Е. Не одновременно по отношению к любому другому читает или пишет):

@propertyWrapper
class Atomic<Value> {
    private var value: Value
    private let queue = DispatchQueue(label: "com.domain.app.atomic", attributes: .concurrent)

    var wrappedValue: Value {
        get { queue.sync { value } }
        set { queue.async(flags: .barrier) { self.value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

Еще один - замки:

@propertyWrapper
struct Atomic<Value> {
    private var value: Value
    private var lock = NSLock()

    var wrappedValue: Value {
        get { lock.synchronized { value } }
        set { lock.synchronized { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

где

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

Мы должны признать, что хотя они и ваша предлагают атомарность, они не будут обеспечивать поточно-ориентированное взаимодействие.

Рассмотрим этот простой эксперимент, в котором мы увеличиваем целое число в миллион раз:

@Atomic var foo = 0

func threadSafetyExperiment() {
    DispatchQueue.global().async {
        DispatchQueue.concurrentPerform(iterations: 1_000_000) { _ in
            self.foo += 1
        }
        print(self.foo)
    }
}

Вы ожидали fooбыть равным 1000 000, но этого не будет. Это потому, что все взаимодействие "получить значение, увеличить его и сохранить" должно быть заключено в единый механизм синхронизации.

Итак, вы вернулись к решениям, не связанным с оболочкой свойств, например

class Synchronized<Value> {
    private var _value: Value
    private let lock = NSLock()

    init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { lock.synchronized { _value } }
        set { lock.synchronized { _value = newValue } }
    }

    func synchronized(block: (inout Value) -> Void) {
        lock.synchronized {
            block(&_value)
        }
    }
}

И тогда это отлично работает:

var foo = Synchronized<Int>(0)

func threadSafetyExperiment() {
    DispatchQueue.global().async {
        DispatchQueue.concurrentPerform(iterations: 1_000_000) { _ in
            self.foo.synchronized { value in
                value += 1
            }
        }
        print(self.foo.value)
    }
}

Как их можно правильно протестировать и измерить, чтобы увидеть разницу между двумя реализациями и работают ли они вообще?

Несколько мыслей:

  • Я бы предложил сделать более 1000 итераций. Вы хотите сделать достаточно итераций, чтобы результаты измерялись в секундах, а не в миллисекундах. Лично я использовал миллион итераций.

  • Фреймворк модульного тестирования идеален как для тестирования корректности, так и для измерения производительности с помощью measure метод (который повторяет тест производительности 10 раз для каждого модульного теста, а результаты фиксируются в отчетах модульного теста):

    Итак, создайте проект с целью модульного теста (или добавьте цель модульного теста в существующий проект, если хотите), а затем создайте модульные тесты и выполните их с command+ u.

  • Если вы отредактируете схему для своей цели, вы можете выбрать случайный порядок ваших тестов, чтобы убедиться, что порядок, в котором они выполняются, не влияет на производительность:

    Я бы также заставил тестовую цель использовать сборку выпуска, чтобы убедиться, что вы тестируете оптимизированную сборку.

Это пример различных видов синхронизации с использованием последовательной очереди GCD, параллельной очереди, блокировок, несправедливых блокировок, семафоров:

class SynchronizedSerial<Value> {
    private var _value: Value
    private let queue = DispatchQueue(label: "com.domain.app.atomic")

    required init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { queue.sync { _value } }
        set { queue.async { self._value = newValue } }
    }

    func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        try queue.sync {
            try block(&_value)
        }
    }

    func writer(block: @escaping (inout Value) -> Void) -> Void {
        queue.async {
            block(&self._value)
        }
    }
}

class SynchronizedReaderWriter<Value> {
    private var _value: Value
    private let queue = DispatchQueue(label: "com.domain.app.atomic", attributes: .concurrent)

    required init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { queue.sync { _value } }
        set { queue.async(flags: .barrier) { self._value = newValue } }
    }

    func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        try queue.sync(flags: .barrier) {
            try block(&_value)
        }
    }

    func reader<T>(block: (Value) throws -> T) rethrows -> T {
        try queue.sync {
            try block(_value)
        }
    }

    func writer(block: @escaping (inout Value) -> Void) -> Void {
        queue.async(flags: .barrier) {
            block(&self._value)
        }
    }
}

struct SynchronizedLock<Value> {
    private var _value: Value
    private let lock = NSLock()

    init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { lock.synchronized { _value } }
        set { lock.synchronized { _value = newValue } }
    }

    mutating func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        try lock.synchronized {
            try block(&_value)
        }
    }
}

/// Unfair lock synchronization
///
/// - Warning: The documentation warns us: “In general, higher level synchronization primitives such as those provided by the pthread or dispatch subsystems should be preferred.”</quote>

class SynchronizedUnfairLock<Value> {
    private var _value: Value
    private var lock = os_unfair_lock()

    required init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { synchronized { $0 } }
        set { synchronized { $0 = newValue } }
    }

    func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        os_unfair_lock_lock(&lock)
        defer { os_unfair_lock_unlock(&lock) }
        return try block(&_value)
    }
}

struct SynchronizedSemaphore<Value> {
    private var _value: Value
    private let semaphore = DispatchSemaphore(value: 1)

    init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { semaphore.waitAndSignal { _value } }
        set { semaphore.waitAndSignal { _value = newValue } }
    }

    mutating func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        try semaphore.waitAndSignal {
            try block(&_value)
        }
    }
}

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

extension DispatchSemaphore {
    func waitAndSignal<T>(block: () throws -> T) rethrows -> T {
        wait()
        defer { signal() }
        return try block()
    }
}
Другие вопросы по тегам