如何使用 Combine 向服务器发送异步加速度计更新

How to use Combine to send async accelerometer updates to server

我的目标是以周期性有效负载的形式将异步加速度计读数发送到服务器。

加速度计数据在网络请求期间离线和并发地继续,因此我需要处理网络故障以及在每个网络请求期间到达的数据。

我不雅的方法是将每个新更新附加到一个数组中:

motionManager.startAccelerometerUpdates(to: .main) { data, error in
    dataArray.append(data)
}

然后定期向服务器发送一组值(network 是我对 NWPathMonitor() 的包装):

let timer = Timer(fire: Date(), interval: 5, // Every 5 seconds
              repeats: true, block: { timer in
                if network.connected {
                    postAccelerometerData(payload: dataArray) { success
                        if success {
                            dataArray.removeAll()
                        }
                    }
                }
            })


RunLoop.current.add(timer, forMode: RunLoop.Mode.default)

这种方法的主要问题是在网络请求触发和完成之间添加的数组元素将从数组中删除,而不会发送到服务器。

我有一些关于为每个网络请求添加队列和使 X 元素出队的想法(但是如果请求失败,我是否将它们添加回队列?)。

我忍不住认为有更好的方法来解决这个问题,使用 Combine 将这些加速度计更新“流”到某种数据结构以缓冲它们,然后将它们发送到服务器。

postAccelerometerData() 函数只是编码一个 JSON 结构并发出网络请求。没什么特别的。

Combine 有一种方法可以在一定时间内收集值并发出一个数组。因此,您可以围绕该方法编排您的解决方案,方法是使用 PassthroughSubject 发送每个值,并使用 .collect 运算符和 byTime 策略将值收集到数组中。

let accelerometerData = PassthroughSubject<CMAccelerometerData, Never>()
motionManager.startAccelerometerUpdates(to: .main) { data, error in
   guard let data = data else { return } // for demo purposes, ignoring errors
   accelerometerData.send(data)
}
// set up a pipeline that periodically sends data to the server
accelerometerData
   .collect(.byTime(DispatchQueue.main, .seconds(5))) // collect for 5 sec
   .sink { dataArray in
       // send to server
       postAccelerometerData(payload: dataArray) { success in
           print("success:", success)
       }
   }
   .store(in: &cancellables) 

以上是一个简化的示例 - 它不处理加速度计错误或网络错误 - 因为它似乎超出了您在此问题中的要求。但是,如果您需要处理网络错误并重试 - 那么您可以将 postAccelerometerData 包装在 Future 中并将其集成到 Combine 管道中。

已编辑

另一种选择是使用 throttle(for:scheduler:latest:)@Published dataArray:

假设 postAccelerometerData(payload:) 显示 data 让我们考虑以下 - 自愿 - 简单视图:

struct ContentView: View {
    @ObservedObject var viewModel = ViewModel()
    var body: some View {
        Text(viewModel.description)
            .padding()
    }
}

struct ContentView_Previews: PreviewProvider {
    static var previews: some View {
        ContentView(viewModel: ViewModel())
    }
}

为了避免过于频繁地更新视图,最好将运动管理器包装在一个单独的 class :

class MotionManagerObserver {
    var motionManager: CMMotionManager
    var cancellables: Set<AnyCancellable> = []
    @Published var data: CMAccelerometerData? = nil
    init() {
        motionManager = CMMotionManager()
        // "start" the accelerometer
        motionManager.startAccelerometerUpdates(to: .main) { data, _  in
            self.data = data
        }
    }
}

现在ViewModel 只听$dataArray:

class ViewModel: ObservableObject, CustomStringConvertible {
    var cancellables: Set<AnyCancellable> = []
    let observer: MotionManagerObserver
    @Published var description: String
    
    init() {
        description = ""
        observer = MotionManagerObserver()
        // the magic happens here with `throttle(for:scheduler:latest:)`
        // `postAccelerometerData(payload:)` will be called every 5s
        // and the `Text` view will be updated
        observer.$data
            .throttle(for: 5, scheduler: RunLoop.main, latest: true)
            .sink(receiveValue: postAccelerometerData(payload:))
            .store(in: &cancellables)
    }
  
    func postAccelerometerData(payload: CMAccelerometerData?) {
        description = payload?.description ?? "N/A"
    }
}

这里的主要思想是:

motionManager.startAccelerometerUpdates(to: .main) { data, _  in
    self.data = data
}

你每次收到更新时都会写入 data 所以如果它是 @Published 你可以简单地使用它的发布者和 throttle 到 post 更新到服务器您选择的间隔。当然,如果您的网络层需要处理错误并重试,它仍然是可能的,但最好将其单独处理