如何在 Swift 3 中创建线程安全变量?

How create thread safe variable in Swift 3?

我是单身狗class

class DeviceController:NSObject, CocoaMQTTDelegate {
     static let sharedInstance = DeviceController()
     var deviceOnArray:[String] = []
     var deviceOffArray:[String] = []
     private override init() {

        clientID = "xyz-" + String(ProcessInfo().processIdentifier)
        mqtt = CocoaMQTT(clientID: clientID, host: "device_controller.xyz.net", port: 1883)
        mqtt.username = "username"
        mqtt.password = "password"
        mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
        mqtt.keepAlive = 30
        mqtt.cleanSession = true
        DeviceController.isConnecting = true
        super.init()
        mqtt.delegate = self
        mqtt.connect()
        self.registerBackgroundTask()
    }
   func sendArm(topic:String){
      // add device to deviceOnArray
   }
   func sendDisarm(topic:String){
      // remove device from deviceOnArray if exist here if I check by code that device is in array it returns false but on console if print it contains the device, It only heppens when I call sendArm and sendDisarm with a second.
     let lockQueue = DispatchQueue(label: "com.test.LockQueue")
        lockQueue.sync() {
           // all code now inside this
         }
     // I also used above code but it's not working
   }


}

如果您阅读代码,您就会知道我在从 deviceOnArray/deviceOffArray 读取正确值时遇到问题,我不确定如何解释这个问题,但我认为我需要的是 Obj- C原子线程安全变量。知道如何创建一个吗?

原子属性在这里帮不了你。它们旨在同步 属性 作为一个整体 的分配,而不是内部的分配(例如,它们不会将元素的 insertion/removal 同步到数组)。他们几乎只确保正确的 retain/release/autorelease 调用,以防止您的程序崩溃/泄漏。

你需要的是 Dispatch​Semaphore 或类似的东西(或者可能是更多原生的东西,posix pthread_mutex 的东西)来确保互斥访问。

您可以使用串行调度队列来确保数组仅以线程安全的方式更新。

最好将您的deviceOnArray 属性 更改为private 以确保它不能被其他对象访问。如果您需要将此数组公开给其他对象,请通过计算 属性 进行。例如

class DeviceController:NSObject, CocoaMQTTDelegate {
     static let sharedInstance = DeviceController()
     private var deviceOnArray:[String] = []
     var deviceOn: [String] {
         return self.deviceOnArray
     }

     var deviceOffArray:[String] = []
     private let dispatchQueue = DispatchQueue(label:"DeviceControllerQueue")

     private override init() {

        clientID = "xyz-" + String(ProcessInfo().processIdentifier)
        mqtt = CocoaMQTT(clientID: clientID, host: "device_controller.xyz.net", port: 1883)
        mqtt.username = "username"
        mqtt.password = "password"
        mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
        mqtt.keepAlive = 30
        mqtt.cleanSession = true
        DeviceController.isConnecting = true
        super.init()
        mqtt.delegate = self
        mqtt.connect()
        self.registerBackgroundTask()
    }
   func sendArm(topic:String){
      // add device to deviceOnArray
       self.dispatchQueue.sync {
           deviceOnArray.append(topic)
       }
   }

   func sendDisarm(topic:String){
      // remove device from deviceOnArray if exist here.
       self.dispatchQueue.sync {
           if let index = self.deviceOnArray.index(of: topic) {
               self.deviceOnArray.remove(at: index)
           }
       }
   }
}

详情

  • Swift5.1,Xcode11.3.1

解决方案

import Foundation
class AtomicValue<T> {

    private var _value: T
    private var accessQueue: DispatchQueue!

    init (value: T) {
        _value = value
        let address = Unmanaged.passUnretained(self).toOpaque()
        let label = "accessQueue.\(type(of: self)).\(address)"
        accessQueue = DispatchQueue(label: label)
    }
}

// MARK: Get/Set with synchronization in current queue

extension AtomicValue {
    func waitSet(lockValueAccessWhile closure: ((_ currentValue: T) -> T)?) {
        accessQueue.sync { [weak self] in
            guard let self = self, let value = closure?(self._value) else { return }
            self._value = value
        }
    }

    func waitGet(lockValueAccessWhile closure: ((_ currentValue: T) -> Void)?) {
        accessQueue.sync { [weak self] in
            guard let self = self else { return }
            closure?(self._value)
        }
    }

    func waitUpdate(lockValueAccessWhile closure: ((_ currentValue: inout T) -> Void)?) {
        accessQueue.sync { [weak self] in
            guard let self = self else { return }
            closure?(&self._value)
        }
    }

    func waitMap<V>(lockValueAccessWhile closure: ((_ currentValue: T) -> V)?) -> NotQueueSafeValue<V> {
        var value: V!
        waitGet { value = closure?([=10=]) }
        return NotQueueSafeValue(notQueueSafeValue: value)
    }

    // Be careful with func waitGet() -> NotQueueSafeValue<T>. It is ONLY for WRITE (SAVE).
    // BAD CODE: atomicValue.waitGet().notQueueSafeValue.doSometing().
    //      it is possible that atomicValue._value could be changed while func doSometing() is performing
    // GOOD CODE: atomicValue.waitGet { [=10=].doSometing() }.
    //      atomicValue._value will never changed while func doSometing() is performing

    struct NotQueueSafeValue<T> { let notQueueSafeValue: T }

    func waitGet() -> NotQueueSafeValue<T> {
        var value: T!
        waitGet { value = [=10=] }
        return NotQueueSafeValue(notQueueSafeValue: value)
    }

    func waitSet(value: T) { waitSet { _ in return value } }
}

// MARK: Get/Set in current queue

extension AtomicValue {
    func set(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: T) -> T)?) {
        queue.async { [weak self] in self?.waitSet(lockValueAccessWhile: closure) }
    }

    func get(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: T) -> Void)?) {
        queue.async { [weak self] in self?.waitGet(lockValueAccessWhile: closure) }
    }

    func update(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: inout T) -> Void)?) {
        queue.async { [weak self] in self?.waitUpdate(lockValueAccessWhile: closure) }
    }
}

用法

// Usage
let atomicValue = AtomicValue(value: 0)

atomicValue.waitSet { currentValue -> Int in
    // Multiple sequential (sync) actions
    return currentValue + 1
}

atomicValue.waitGet { currentValue in
    // Multiple sequential (sync) actions
    print("\(currentValue)")
}

atomicValue.waitUpdate { currentValue in
    // Multiple sequential (sync) actions
    currentValue += 1
}

atomicValue.set(waitAccessIn: .global(qos: .default)) { currentValue -> Int in
    // Multiple sequential (sync) actions
    return currentValue + 1
}

atomicValue.get(waitAccessIn: .global(qos: .default)) { currentValue in
    // Multiple sequential (sync) actions
    print("\(currentValue)")
}

atomicValue.update(waitAccessIn: .global(qos: .background)) { currentValue in
    // Multiple sequential (sync) actions
    currentValue += 1
}

print(atomicValue.waitGet().notQueueSafeValue)
atomicValue.waitSet(value: 2)

print("\(atomicValue.waitMap { "value: \([=11=])" }.notQueueSafeValue )")

样本

Do not forget to paste the solution code here.

class Test {
    private let atomicValue = AtomicValue(value: 0)
    private(set) var count = 0
    private var usecDelay: UInt32 = 1000
    private let range = (0..<100)

    private func _set(currentValue: Int, dispatchGroup: DispatchGroup? = nil) -> Int {
        let newValue = currentValue + 1
        //print("\(queue.label) queue: \(newValue)")
        count += 1
        usleep(usecDelay)
        dispatchGroup?.leave()
        return newValue
    }

    private func _get(value: Int, queueLabel: String, dispatchGroup: DispatchGroup? = nil) {
        print("  get \(queueLabel) queue: \(value)")
        usleep(usecDelay)
        dispatchGroup?.leave()
    }

    func test1(queue: DispatchQueue, completion: (() -> Void)?) {
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.waitSet { [weak self] in
                    return self?._set(currentValue: [=12=]) ?? [=12=]
                }
                self.atomicValue.waitGet { [weak self] in
                    self?._get(value: [=12=], queueLabel: queue.label)
                }
            }
            print("test1.work at \(queue.label) queue completed")
            completion?()
        }
    }

    func test2(queue: DispatchQueue, completion: (() -> Void)?) {
        let dispatchGroup = DispatchGroup()
        range.forEach { _ in
            dispatchGroup.enter()
            dispatchGroup.enter()
        }
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.waitUpdate { [weak self] in
                    guard let value = self?._set(currentValue: [=12=], dispatchGroup: dispatchGroup) else { return }
                    [=12=] = value
                }
                self.atomicValue.waitGet { [weak self] in
                    self?._get(value: [=12=], queueLabel: queue.label, dispatchGroup: dispatchGroup)
                }
            }
        }
        dispatchGroup.notify(queue: queue) {
            print("test2.work at \(queue.label) queue completed")
            completion?()
        }
    }


    func test3(queue: DispatchQueue, waitIn waitQueue: DispatchQueue, completion: (() -> Void)?) {
        let dispatchGroup = DispatchGroup()
        range.forEach { _ in
            dispatchGroup.enter()
            dispatchGroup.enter()
            dispatchGroup.enter()
        }
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.set(waitAccessIn: waitQueue) { [weak self] in
                    return self?._set(currentValue: [=12=], dispatchGroup: dispatchGroup) ?? [=12=]
                }
                self.atomicValue.get(waitAccessIn: waitQueue)  { [weak self] in
                    self?._get(value: [=12=], queueLabel: queue.label, dispatchGroup: dispatchGroup)
                }
                self.atomicValue.update(waitAccessIn: waitQueue) { [weak self] in
                    guard let value = self?._set(currentValue: [=12=], dispatchGroup: dispatchGroup) else { return }
                    [=12=] = value
                }
            }
        }
        dispatchGroup.notify(queue: queue) {
            print("test3.work at \(queue.label) queue completed")
            completion?()
        }
    }

    func run() {
        let dspatchGroup = DispatchGroup()
        dspatchGroup.enter()
        test1(queue: .global(qos: .utility)) { dspatchGroup.leave() }                                           // result: count += range.max()
        dspatchGroup.enter()
        test1(queue: .global(qos: .unspecified))  { dspatchGroup.leave() }                                      // result: count += range.max()

        dspatchGroup.enter()
        test2(queue: .main) { dspatchGroup.leave() }                                                            // result: count += range.max()
        dspatchGroup.enter()
        test2(queue: .global(qos: .unspecified)) { dspatchGroup.leave() }                                       // result: count += range.max()

        dspatchGroup.enter()
        test3(queue: .global(qos: .default), waitIn: .global(qos: .background)) { dspatchGroup.leave() }        // result: count += 2*range.max()
        dspatchGroup.enter()
        test3(queue: .main, waitIn: .global(qos: .userInteractive)) { dspatchGroup.leave() }                    // result: count += 2*range.max()

        dspatchGroup.notify(queue: .main) { print("End. Count: \(self.count)") }
    }
}

Test().run()

结果(日志)

  get com.apple.root.default-qos queue: 3
  get com.apple.root.default-qos queue: 4
  get com.apple.root.default-qos queue: 4
  get com.apple.root.utility-qos queue: 5
  get com.apple.root.default-qos queue: 6
  get com.apple.root.default-qos queue: 7
  get com.apple.root.default-qos queue: 8
  get com.apple.root.default-qos queue: 9
  get com.apple.root.default-qos queue: 10
  get com.apple.root.default-qos queue: 11
  get com.apple.root.default-qos queue: 12
  get com.apple.root.default-qos queue: 13
  get com.apple.root.default-qos queue: 14
test3.work at com.apple.root.default-qos queue completed
  get com.apple.main-thread queue: 15
  get com.apple.root.default-qos queue: 17
  get com.apple.root.default-qos queue: 18
  get com.apple.root.utility-qos queue: 18
  get com.apple.main-thread queue: 19
  get com.apple.root.default-qos queue: 21
  get com.apple.root.default-qos queue: 22
  get com.apple.root.utility-qos queue: 22
  get com.apple.main-thread queue: 23
  get com.apple.root.default-qos queue: 25
  get com.apple.root.default-qos queue: 26
  get com.apple.root.utility-qos queue: 26
  get com.apple.main-thread queue: 27
  get com.apple.root.default-qos queue: 29
  get com.apple.root.default-qos queue: 30
  get com.apple.root.utility-qos queue: 30
  get com.apple.main-thread queue: 31
  get com.apple.root.default-qos queue: 33
  get com.apple.root.default-qos queue: 34
  get com.apple.root.utility-qos queue: 34
  get com.apple.main-thread queue: 35
  get com.apple.root.default-qos queue: 37
  get com.apple.root.default-qos queue: 38
  get com.apple.root.utility-qos queue: 38
  get com.apple.main-thread queue: 39
  get com.apple.root.default-qos queue: 41
  get com.apple.root.default-qos queue: 42
  get com.apple.root.utility-qos queue: 42
  get com.apple.main-thread queue: 43
  get com.apple.root.default-qos queue: 45
  get com.apple.root.default-qos queue: 46
  get com.apple.root.utility-qos queue: 46
  get com.apple.main-thread queue: 47
  get com.apple.root.default-qos queue: 49
test1.work at com.apple.root.default-qos queue completed
  get com.apple.root.default-qos queue: 50
test2.work at com.apple.root.default-qos queue completed
  get com.apple.root.utility-qos queue: 50
  get com.apple.main-thread queue: 50
test1.work at com.apple.root.utility-qos queue completed
  get com.apple.main-thread queue: 51
test2.work at com.apple.main-thread queue completed
  get com.apple.main-thread queue: 52
  get com.apple.main-thread queue: 53
  get com.apple.main-thread queue: 54
  get com.apple.main-thread queue: 55
  get com.apple.main-thread queue: 56
  get com.apple.main-thread queue: 57
  get com.apple.main-thread queue: 58
  get com.apple.main-thread queue: 59
  get com.apple.main-thread queue: 60
test3.work at com.apple.main-thread queue completed
End. Count: 60