Добавление элементов в массив Swift в нескольких потоках, вызывающих проблемы (поскольку массивы не являются поточно-ориентированными), - как мне обойти это?
Я хочу добавить данные блоки в массив, а затем запустить все блоки, содержащиеся в массиве, по запросу. У меня есть код, похожий на этот:
class MyArrayBlockClass {
private var blocksArray: Array<() -> Void> = Array()
private let blocksQueue: NSOperationQueue()
func addBlockToArray(block: () -> Void) {
self.blocksArray.append(block)
}
func runBlocksInArray() {
for block in self.blocksArray {
let operation = NSBlockOperation(block: block)
self.blocksQueue.addOperation(operation)
}
self.blocksQueue.removeAll(keepCapacity: false)
}
}
Проблема заключается в том, что addBlockToArray может вызываться в нескольких потоках. Происходит то, что addBlockToArray вызывается в быстрой последовательности в разных потоках и добавляет только один из элементов, поэтому другой элемент не вызывается во время runBlocksInArray.
Я пробовал что-то вроде этого, который, кажется, не работает:
private let blocksDispatchQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
func addBlockToArray(block: () -> Void) {
dispatch_async(blocksDispatchQueue) {
self.blocksArray.append(block)
}
}
5 ответов
Вы определили свой blocksDispatchQueue
быть глобальной очередью. Обновление этого для Swift 3 эквивалентно:
private let blocksDispatchQueue = DispatchQueue.global()
func addBlockToArray(block: @escaping () -> Void) {
blocksDispatchQueue.async {
self.blocksArray.append(block)
}
}
Проблема состоит в том, что глобальные очереди являются параллельными очередями, поэтому вы не достигаете желаемой синхронизации. Но если бы вы создали свою собственную последовательную очередь, это было бы хорошо, например, в Swift 3:
private let blocksDispatchQueue = DispatchQueue(label: "com.domain.app.blocks")
Эта пользовательская очередь по умолчанию является последовательной очередью. Таким образом, вы достигнете желаемой синхронизации.
Обратите внимание, если вы используете это blocksDispatchQueue
чтобы синхронизировать ваше взаимодействие с этой очередью, все взаимодействие с этим blocksArray
должен координироваться через эту очередь, например, также отправлять код для добавления операций, используя ту же очередь:
func runBlocksInArray() {
blocksDispatchQueue.async {
for block in self.blocksArray {
let operation = BlockOperation(block: block)
self.blocksQueue.addOperation(operation)
}
self.blocksArray.removeAll()
}
}
В качестве альтернативы вы также можете использовать шаблон чтения / записи, создавая собственную параллельную очередь:
private let blocksDispatchQueue = DispatchQueue(label: "com.domain.app.blocks", attributes: .concurrent)
Но в паттерне "читатель-писатель" записи должны выполняться с использованием барьера (достижение последовательного поведения при записи):
func addBlockToArray(block: @escaping () -> Void) {
blocksDispatchQueue.async(flags: .barrier) {
self.blocksArray.append(block)
}
}
Но теперь вы можете читать данные, как указано выше:
blocksDispatchQueue.sync {
someVariable = self.blocksArray[index]
}
Преимущество этого шаблона заключается в том, что записи синхронизируются, но чтения могут происходить одновременно по отношению друг к другу. Это, вероятно, не критично в этом случае (поэтому, вероятно, будет достаточно простой последовательной очереди), но для полноты картины я включил этот шаблон чтения-записи.
Если вы ищете примеры Swift 2, посмотрите предыдущую версию этого ответа.
Для синхронизации между потоками используйте dispatch_sync
(не _async) и ваша собственная очередь отправки (не глобальная):
class MyArrayBlockClass {
private var queue = dispatch_queue_create("andrew.myblockarrayclass", nil)
func addBlockToArray(block: () -> Void) {
dispatch_sync(queue) {
self.blocksArray.append(block)
}
}
//....
}
dispatch_sync
это хорошо и просто в использовании и должно быть достаточно для вашего случая (я использую это для всех моих потребностей синхронизации потока в настоящее время), но вы также можете использовать низкоуровневые блокировки и мьютексы. Есть отличная статья Майка Эша о различных вариантах: замки, безопасность потоков и Swift.
Создайте последовательную очередь и внесите изменения в массив в этом потоке. Ваш вызов создания потока должен быть примерно таким
private let blocksDispatchQueue = dispatch_queue_create("SynchronizedArrayAccess", DISPATCH_QUEUE_SERIAL)
Тогда вы можете использовать его так же, как сейчас.
func addBlockToArray(block: () -> Void) {
dispatch_async(blocksDispatchQueue) {
self.blocksArray.append(block)
}
}
подробности
xCode 9.1, Swift 4
Решение
import Foundation
class AtomicArray<T>: CustomStringConvertible {
private let semaphore = DispatchSemaphore(value: 1)
private var semaphors = [DispatchSemaphore]()
private var array: [T]
func append(newElement: T) {
sync {
array.append(newElement)
}
}
subscript(index: Int) -> T {
get {
wait()
defer {
signal()
}
return array[index]
}
set(newValue) {
wait()
array[index] = newValue
signal()
}
}
var count: Int {
wait()
defer {
signal()
}
return array.count
}
private func wait() {
semaphore.wait()
}
private func signal() {
semaphore.signal()
}
func set(closure: (_ curentArray: [T])->([T]) ) {
sync {
array = closure(array)
}
}
func get(closure: (_ curentArray: [T])->()) {
wait()
closure(array)
signal()
}
func get() -> [T] {
wait()
defer {
signal()
}
return array
}
private func sync (closure:()->()) {
wait()
closure()
signal()
}
init (array: [T]) {
self.array = array
}
var description: String {
wait()
defer {
signal()
}
return "\(array)"
}
}
использование
Основная идея заключается в использовании синтаксиса регулярного массива
let atomicArray = AtomicArray(array: [3,2,1])
print(atomicArray)
atomicArray.append(newElement: 1)
let arr = atomicArray.get()
print(arr)
atomicArray[2] = 0
atomicArray.get { currentArray in
print(currentArray)
}
atomicArray.set { currentArray -> [Int] in
return currentArray.map{ item -> Int in
return item*item
}
}
print(atomicArray)
}
Результат использования
Полный образец
import UIKit
class ViewController: UIViewController {
var atomicArray = AtomicArray(array: [Int](repeating: 0, count: 100))
let dispatchGroup = DispatchGroup()
override func viewDidLoad() {
super.viewDidLoad()
arrayInfo()
sample { index, dispatch in
self.atomicArray[index] += 1
}
dispatchGroup.notify(queue: .main) {
self.arrayInfo()
self.atomicArray.set { currentArray -> ([Int]) in
return currentArray.map{ (item) -> Int in
return item + 100
}
}
self.arrayInfo()
}
}
private func arrayInfo() {
print("Count: \(self.atomicArray.count)\nData: \(self.atomicArray)")
}
func sample(closure: @escaping (Int,DispatchQueue)->()) {
print("----------------------------------------------\n")
async(dispatch: .main, closure: closure)
async(dispatch: .global(qos: .userInitiated), closure: closure)
async(dispatch: .global(qos: .utility), closure: closure)
async(dispatch: .global(qos: .default), closure: closure)
async(dispatch: .global(qos: .userInteractive), closure: closure)
}
private func async(dispatch: DispatchQueue, closure: @escaping (Int,DispatchQueue)->()) {
for index in 0..<atomicArray.count {
dispatchGroup.enter()
dispatch.async {
closure(index,dispatch)
self.dispatchGroup.leave()
}
}
}
}
Полный образец результата
NSOperationQueue
сам по себе потокобезопасен, так что вы можете установить suspended
в true, добавьте все блоки, которые вы хотите из любого потока, а затем установите suspended
в false, чтобы запустить все блоки.