Добавление элементов в массив Swift в нескольких потоках, вызывающих проблемы (поскольку массивы не являются поточно-ориентированными), - как мне обойти это?

Я хочу добавить данные блоки в массив, а затем запустить все блоки, содержащиеся в массиве, по запросу. У меня есть код, похожий на этот:

class MyArrayBlockClass {
    private var blocksArray: Array<() -> Void> = Array()

    private let blocksQueue: NSOperationQueue()

    func addBlockToArray(block: () -> Void) {
        self.blocksArray.append(block)
    }

    func runBlocksInArray() {
        for block in self.blocksArray {
            let operation = NSBlockOperation(block: block)
            self.blocksQueue.addOperation(operation)
        }

        self.blocksQueue.removeAll(keepCapacity: false)
    }
}

Проблема заключается в том, что addBlockToArray может вызываться в нескольких потоках. Происходит то, что addBlockToArray вызывается в быстрой последовательности в разных потоках и добавляет только один из элементов, поэтому другой элемент не вызывается во время runBlocksInArray.

Я пробовал что-то вроде этого, который, кажется, не работает:

private let blocksDispatchQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)

func addBlockToArray(block: () -> Void) {
    dispatch_async(blocksDispatchQueue) {
        self.blocksArray.append(block)
    }
}

5 ответов

Вы определили свой blocksDispatchQueue быть глобальной очередью. Обновление этого для Swift 3 эквивалентно:

private let blocksDispatchQueue = DispatchQueue.global()

func addBlockToArray(block: @escaping () -> Void) {
    blocksDispatchQueue.async {
        self.blocksArray.append(block)
    }
}

Проблема состоит в том, что глобальные очереди являются параллельными очередями, поэтому вы не достигаете желаемой синхронизации. Но если бы вы создали свою собственную последовательную очередь, это было бы хорошо, например, в Swift 3:

private let blocksDispatchQueue = DispatchQueue(label: "com.domain.app.blocks")

Эта пользовательская очередь по умолчанию является последовательной очередью. Таким образом, вы достигнете желаемой синхронизации.

Обратите внимание, если вы используете это blocksDispatchQueue чтобы синхронизировать ваше взаимодействие с этой очередью, все взаимодействие с этим blocksArray должен координироваться через эту очередь, например, также отправлять код для добавления операций, используя ту же очередь:

func runBlocksInArray() {
    blocksDispatchQueue.async {
        for block in self.blocksArray {
            let operation = BlockOperation(block: block)
            self.blocksQueue.addOperation(operation)
        }

        self.blocksArray.removeAll()
    }
}

В качестве альтернативы вы также можете использовать шаблон чтения / записи, создавая собственную параллельную очередь:

private let blocksDispatchQueue = DispatchQueue(label: "com.domain.app.blocks", attributes: .concurrent)

Но в паттерне "читатель-писатель" записи должны выполняться с использованием барьера (достижение последовательного поведения при записи):

func addBlockToArray(block: @escaping () -> Void) {
    blocksDispatchQueue.async(flags: .barrier) {
        self.blocksArray.append(block)
    }
}

Но теперь вы можете читать данные, как указано выше:

blocksDispatchQueue.sync {
    someVariable = self.blocksArray[index]
}

Преимущество этого шаблона заключается в том, что записи синхронизируются, но чтения могут происходить одновременно по отношению друг к другу. Это, вероятно, не критично в этом случае (поэтому, вероятно, будет достаточно простой последовательной очереди), но для полноты картины я включил этот шаблон чтения-записи.

Если вы ищете примеры Swift 2, посмотрите предыдущую версию этого ответа.

Для синхронизации между потоками используйте dispatch_sync (не _async) и ваша собственная очередь отправки (не глобальная):

class MyArrayBlockClass {
    private var queue = dispatch_queue_create("andrew.myblockarrayclass", nil)

    func addBlockToArray(block: () -> Void) {
        dispatch_sync(queue) {
            self.blocksArray.append(block)
        } 
    }
    //....
}

dispatch_sync это хорошо и просто в использовании и должно быть достаточно для вашего случая (я использую это для всех моих потребностей синхронизации потока в настоящее время), но вы также можете использовать низкоуровневые блокировки и мьютексы. Есть отличная статья Майка Эша о различных вариантах: замки, безопасность потоков и Swift.

Создайте последовательную очередь и внесите изменения в массив в этом потоке. Ваш вызов создания потока должен быть примерно таким

private let blocksDispatchQueue = dispatch_queue_create("SynchronizedArrayAccess", DISPATCH_QUEUE_SERIAL)

Тогда вы можете использовать его так же, как сейчас.

func addBlockToArray(block: () -> Void) {
    dispatch_async(blocksDispatchQueue) {
        self.blocksArray.append(block)
    }
}

подробности

xCode 9.1, Swift 4

Решение

import Foundation

class AtomicArray<T>:  CustomStringConvertible {

    private let semaphore = DispatchSemaphore(value: 1)
    private var semaphors = [DispatchSemaphore]()
    private var array: [T]

    func append(newElement: T) {
        sync {
            array.append(newElement)
        }
    }

    subscript(index: Int) -> T {

        get {
            wait()
            defer {
                signal()
            }
            return array[index]
        }
        set(newValue) {
            wait()
            array[index] = newValue
            signal()
        }
    }

    var count: Int {
        wait()
        defer {
            signal()
        }
        return array.count
    }

    private func wait() {
        semaphore.wait()
    }

    private func signal() {
        semaphore.signal()
    }

    func set(closure: (_ curentArray: [T])->([T]) ) {
        sync {
            array = closure(array)
        }
    }

    func get(closure: (_ curentArray: [T])->()) {
        wait()
        closure(array)
        signal()
    }

    func get() -> [T] {
        wait()
        defer {
            signal()
        }
        return array
    }

    private func sync (closure:()->()) {
        wait()
        closure()
        signal()
    }

    init (array: [T]) {
        self.array = array
    }

    var description: String {
        wait()
        defer {
            signal()
        }
        return "\(array)"
    }
}

использование

Основная идея заключается в использовании синтаксиса регулярного массива

let atomicArray = AtomicArray(array: [3,2,1])

    print(atomicArray)
    atomicArray.append(newElement: 1)

    let arr = atomicArray.get()
    print(arr)
    atomicArray[2] = 0

    atomicArray.get { currentArray in
        print(currentArray)
    }

    atomicArray.set { currentArray -> [Int] in
        return currentArray.map{ item -> Int in
            return item*item
        }
    }
    print(atomicArray)

}

Результат использования

Полный образец

import UIKit

class ViewController: UIViewController {

    var atomicArray = AtomicArray(array: [Int](repeating: 0, count: 100))

    let dispatchGroup = DispatchGroup()

    override func viewDidLoad() {
        super.viewDidLoad()

        arrayInfo()

        sample { index, dispatch in
            self.atomicArray[index] += 1
        }

        dispatchGroup.notify(queue: .main) {
            self.arrayInfo()
            self.atomicArray.set { currentArray -> ([Int]) in
                return currentArray.map{ (item) -> Int in
                    return item + 100
                }
            }
           self.arrayInfo()
        }

    }

    private func arrayInfo() {
        print("Count: \(self.atomicArray.count)\nData: \(self.atomicArray)")
    }

    func sample(closure: @escaping (Int,DispatchQueue)->()) {

        print("----------------------------------------------\n")

        async(dispatch: .main, closure: closure)
        async(dispatch: .global(qos: .userInitiated), closure: closure)
        async(dispatch: .global(qos: .utility), closure: closure)
        async(dispatch: .global(qos: .default), closure: closure)
        async(dispatch: .global(qos: .userInteractive), closure: closure)
    }

    private func async(dispatch: DispatchQueue, closure: @escaping (Int,DispatchQueue)->()) {

        for index in 0..<atomicArray.count {
            dispatchGroup.enter()
            dispatch.async {
                closure(index,dispatch)
                self.dispatchGroup.leave()
            }
        }
    }
}

Полный образец результата

NSOperationQueue сам по себе потокобезопасен, так что вы можете установить suspended в true, добавьте все блоки, которые вы хотите из любого потока, а затем установите suspended в false, чтобы запустить все блоки.

Другие вопросы по тегам