合并:发布者有时会失去价值并完成
Combine: Publisher sometimes loses value and completes
我有一个简单的 Deferred Publisher
从磁盘读取数据并在 SwiftUI List
中显示数据,Publisher
大多数情况下工作正常,但是 有时 它表现不佳,它只是失去了它的价值(这是一个 Model
对象的数组)并以 finished
消息完成.我已经尝试了 here 提到的解决方法,使用 buffer
运算符将值保存在缓冲区中,因为我相信 Combine's Publisher
设计不会在没有需求的情况下将数据传递到下游订阅者请求并因此删除此数据并完成,但是使用 buffer
没有解决问题。
我的代码:
enum FileError: Error {
case someError
}
class ViewModel: ObservableObject {
@Published var modelArray = [Model]()
private var subscriptions = Set<AnyCancellable>()
func readData() {
DataSource()
.readFromBundle(resource: "Sample", type: "json")
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
print("Completion: \(completion)")
}) { array in
self.modelArray = array
}.store(in: &subscriptions)
}
}
struct ContentView: View {
@ObservedObject var viewModel: ViewModel
var body: some View {
VStack {
List(self.viewModel.modelArray) { model in
Text("\(model.name)")
}
}
.onAppear {
self.viewModel.readData()
}
}
}
struct Model: Codable, Identifiable {
var id: Int
var name: String
}
class DataSource {
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)
func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
Deferred {
Future { promise in
guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
let data = try? Data(contentsOf: url),
let modelArray = try? JSONDecoder().decode([Model].self, from: data)
else {
promise(.failure(.someError))
return
}
promise(.success(modelArray))
}
}
.receive(on: self.readQueue)
.eraseToAnyPublisher()
}
}
This is a link 下载工作示例项目。
编辑:
环境:Xcode 11.3.1,iOS 13.3 iPhone 11 Pro Max 模拟器和设备。
gif 截图(注意控制台输出)
EDIT2:
如果我添加任何下游发布者,例如 combineLatest
,例如在消费者函数 sink
之前 readData()
,则会引入一个新行为,它链接一个异步发布者 (readFromBundle)使用同步发布者 (combineLatest
) 将导致该值根本不会在 iOS 13.3+
设备上交付,有时会在低于 iOS 13.3
的设备上交付,如 this link 所述。
第一个 运行 没有失败,它只是 "needs" 一次加载它....您可以通过添加它来检查它。
print("ready")
promise(.success(modelArray))
然后设置断点到"not loaded yet",你会看到"not loaded yet"出现在控制台打印"ready"之前。这不是发布者的滴
正如 onAppear() 所说,它将在显示 UI 之后被调用....
if self.viewModel.modelArray.count == 0 {
Text("not loaded yet")
} else {
List(self.viewModel.modelArray) { model in
Text("\(model.name)")
}
}
看起来像赛车问题,请尝试以下(仅通过阅读代码)
1) 显式使用后台队列
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .background,
attributes: .concurrent)
2) 在此队列上安排发布者而不是在其上接收
.subscribe(on: self.readQueue)
查看有关 .receive(on:)
的文档
指定从发布者接收元素的调度程序。
宣言
func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Publishers.SubscribeOn<Deferred<Future<[Model], FileError>>, DispatchQueue>, S> where S : Scheduler
讨论
您使用 receive(on:options:)
运算符在特定调度程序上接收结果,例如在主 运行 循环上执行 UI 工作。与影响上游消息的subscribe(on:options:)
相反,receive(on:options:)
改变了下游消息的执行上下文。在下面的示例中,对 jsonPublisher 的请求是在 backgroundQueue 上执行的,但是从它接收的元素是在 RunLoop.main.
上执行的
let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.
jsonPublisher
.subscribe(on: backgroundQueue)
.receiveOn(on: RunLoop.main)
.subscribe(labelUpdater)
参数
调度器
发布者用于元素传递的调度程序。
选项
自定义元素交付的调度程序选项。
Returns
使用指定调度程序传送元素的发布者。
在你的情况下它意味着
import SwiftUI
import Combine
enum FileError: Error {
case someError
}
class ViewModel: ObservableObject {
@Published var modelArray = [Model]()
private var subscriptions = Set<AnyCancellable>()
func readData() {
DataSource()
.readFromBundle(resource: "Sample", type: "json")
.sink(receiveCompletion: { completion in
print("Completion: \(completion)")
}) { array in
print("received value")
self.modelArray = array
}.store(in: &subscriptions)
}
}
struct ContentView: View {
@ObservedObject var viewModel: ViewModel
var body: some View {
VStack {
List(self.viewModel.modelArray) { model in
Text("\(model.name)")
}
}
.onAppear {
self.viewModel.readData()
}
}
}
struct Model: Codable, Identifiable {
var id: Int
var name: String
}
class DataSource {
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)
func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
Deferred {
Future { promise in
guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
let data = try? Data(contentsOf: url),
let modelArray = try? JSONDecoder().decode([Model].self, from: data)
else {
promise(.failure(.someError))
return
}
promise(.success(modelArray))
}
}
.subscribe(on: readQueue)
.receive(on: RunLoop.main)
.eraseToAnyPublisher()
}
}
这解释了为什么 Asperi 的解决方案有效。区别在于,没有必要在 readData()
中再次调用 .receive(on:)
DispatchQueue.main
和 RunLoop.main
之间的差异在您的示例中并不重要。
代码中的问题在于receive(on:)
中使用的readQueue
是并发的。 value 和 completion 都被分别派发到这个队列,这样 value 和 completion 的顺序就得不到保证。如果下游订阅者首先收到完成,它将取消订阅并忽略该值。使 readQueue
串行工作与直接使用另一个串行队列(例如 DispatchQueue.main
一样。
使用 subscribe(on:)
而不是 receive(on:)
与并发队列结合使用,因为 promise
调用导致值和完成发送一起分派。
我有一个简单的 Deferred Publisher
从磁盘读取数据并在 SwiftUI List
中显示数据,Publisher
大多数情况下工作正常,但是 有时 它表现不佳,它只是失去了它的价值(这是一个 Model
对象的数组)并以 finished
消息完成.我已经尝试了 here 提到的解决方法,使用 buffer
运算符将值保存在缓冲区中,因为我相信 Combine's Publisher
设计不会在没有需求的情况下将数据传递到下游订阅者请求并因此删除此数据并完成,但是使用 buffer
没有解决问题。
我的代码:
enum FileError: Error {
case someError
}
class ViewModel: ObservableObject {
@Published var modelArray = [Model]()
private var subscriptions = Set<AnyCancellable>()
func readData() {
DataSource()
.readFromBundle(resource: "Sample", type: "json")
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
print("Completion: \(completion)")
}) { array in
self.modelArray = array
}.store(in: &subscriptions)
}
}
struct ContentView: View {
@ObservedObject var viewModel: ViewModel
var body: some View {
VStack {
List(self.viewModel.modelArray) { model in
Text("\(model.name)")
}
}
.onAppear {
self.viewModel.readData()
}
}
}
struct Model: Codable, Identifiable {
var id: Int
var name: String
}
class DataSource {
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)
func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
Deferred {
Future { promise in
guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
let data = try? Data(contentsOf: url),
let modelArray = try? JSONDecoder().decode([Model].self, from: data)
else {
promise(.failure(.someError))
return
}
promise(.success(modelArray))
}
}
.receive(on: self.readQueue)
.eraseToAnyPublisher()
}
}
This is a link 下载工作示例项目。
编辑:
环境:Xcode 11.3.1,iOS 13.3 iPhone 11 Pro Max 模拟器和设备。
gif 截图(注意控制台输出)
EDIT2:
如果我添加任何下游发布者,例如 combineLatest
,例如在消费者函数 sink
之前 readData()
,则会引入一个新行为,它链接一个异步发布者 (readFromBundle)使用同步发布者 (combineLatest
) 将导致该值根本不会在 iOS 13.3+
设备上交付,有时会在低于 iOS 13.3
的设备上交付,如 this link 所述。
第一个 运行 没有失败,它只是 "needs" 一次加载它....您可以通过添加它来检查它。
print("ready")
promise(.success(modelArray))
然后设置断点到"not loaded yet",你会看到"not loaded yet"出现在控制台打印"ready"之前。这不是发布者的滴
正如 onAppear() 所说,它将在显示 UI 之后被调用....
if self.viewModel.modelArray.count == 0 {
Text("not loaded yet")
} else {
List(self.viewModel.modelArray) { model in
Text("\(model.name)")
}
}
看起来像赛车问题,请尝试以下(仅通过阅读代码)
1) 显式使用后台队列
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .background,
attributes: .concurrent)
2) 在此队列上安排发布者而不是在其上接收
.subscribe(on: self.readQueue)
查看有关 .receive(on:)
指定从发布者接收元素的调度程序。 宣言
func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Publishers.SubscribeOn<Deferred<Future<[Model], FileError>>, DispatchQueue>, S> where S : Scheduler
讨论
您使用 receive(on:options:)
运算符在特定调度程序上接收结果,例如在主 运行 循环上执行 UI 工作。与影响上游消息的subscribe(on:options:)
相反,receive(on:options:)
改变了下游消息的执行上下文。在下面的示例中,对 jsonPublisher 的请求是在 backgroundQueue 上执行的,但是从它接收的元素是在 RunLoop.main.
let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.
jsonPublisher
.subscribe(on: backgroundQueue)
.receiveOn(on: RunLoop.main)
.subscribe(labelUpdater)
参数
调度器
发布者用于元素传递的调度程序。
选项
自定义元素交付的调度程序选项。
Returns
使用指定调度程序传送元素的发布者。
在你的情况下它意味着
import SwiftUI
import Combine
enum FileError: Error {
case someError
}
class ViewModel: ObservableObject {
@Published var modelArray = [Model]()
private var subscriptions = Set<AnyCancellable>()
func readData() {
DataSource()
.readFromBundle(resource: "Sample", type: "json")
.sink(receiveCompletion: { completion in
print("Completion: \(completion)")
}) { array in
print("received value")
self.modelArray = array
}.store(in: &subscriptions)
}
}
struct ContentView: View {
@ObservedObject var viewModel: ViewModel
var body: some View {
VStack {
List(self.viewModel.modelArray) { model in
Text("\(model.name)")
}
}
.onAppear {
self.viewModel.readData()
}
}
}
struct Model: Codable, Identifiable {
var id: Int
var name: String
}
class DataSource {
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)
func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
Deferred {
Future { promise in
guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
let data = try? Data(contentsOf: url),
let modelArray = try? JSONDecoder().decode([Model].self, from: data)
else {
promise(.failure(.someError))
return
}
promise(.success(modelArray))
}
}
.subscribe(on: readQueue)
.receive(on: RunLoop.main)
.eraseToAnyPublisher()
}
}
这解释了为什么 Asperi 的解决方案有效。区别在于,没有必要在 readData()
DispatchQueue.main
和 RunLoop.main
之间的差异在您的示例中并不重要。
代码中的问题在于receive(on:)
中使用的readQueue
是并发的。 value 和 completion 都被分别派发到这个队列,这样 value 和 completion 的顺序就得不到保证。如果下游订阅者首先收到完成,它将取消订阅并忽略该值。使 readQueue
串行工作与直接使用另一个串行队列(例如 DispatchQueue.main
一样。
使用 subscribe(on:)
而不是 receive(on:)
与并发队列结合使用,因为 promise
调用导致值和完成发送一起分派。