在后台线程中执行 Combine Future 不起作用
Execute Combine Future in background thread is not working
如果你 运行 在 Playground 上这样做:
import Combine
import Foundation
struct User {
let name: String
}
var didAlreadyImportUsers = false
var importUsers: Future<Bool, Never> {
Future { promise in
sleep(5)
promise(.success(true))
}
}
var fetchUsers: Future<[User], Error> {
Future { promise in
promise(.success([User(name: "John"), User(name: "Jack")]))
}
}
var users: AnyPublisher<[User], Error> {
if didAlreadyImportUsers {
return fetchUsers
.receive(on: DispatchQueue.global(qos: .userInitiated))
.eraseToAnyPublisher()
} else {
return importUsers
.receive(on: DispatchQueue.global(qos: .userInitiated))
.setFailureType(to: Error.self)
.combineLatest(fetchUsers)
.map { [=10=].1 }
.eraseToAnyPublisher()
}
}
users
.receive(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
})
print("run")
输出将是:
[User(name: "John"), User(name: "Jack")]
run
finished
但我希望得到:
run
[User(name: "John"), User(name: "Jack")]
finished
因为接收器应该运行后台线程中的代码。我在这里缺少什么。
需要刷代码吗:
sleep(5)
promise(.success(true))
在后台线程中?那么
的目的是什么
.receive(on: DispatchQueue.global(qos: .userInitiated))
你的 Future 运行 一经创建,所以在你的情况下,一旦这个 属性 被访问:
var importUsers: Future<Bool, Never> {
Future { promise in
sleep(5)
promise(.success(true))
}
}
并且由于 Future
运行 立即执行,这意味着传递给 promise 的闭包会立即执行,使主线程在继续之前休眠 5 秒。在您的情况下,一旦您访问在主线程上完成的 users
,就会创建 Future
。
receive(on:
影响 sink
(或下游发布者)接收值的线程,而不是创建值的线程。由于在您调用 .sink
时期货已经完成,完成和发出的值会立即传送到 sink
。在后台队列中,但仍然立即。
之后,您终于到达了 print("run")
行。
如果将 sleep(5)
位替换为:
var importUsers: Future<Bool, Never> {
Future { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + 5) {
promise(.success(true))
}
}
}
并对您的订阅代码进行一些小调整:
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var cancellables = Set<AnyCancellable>()
users
.receive(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
您会看到输出按预期打印,因为初始 future 不会阻塞主线程五秒钟。
或者,如果您继续睡眠并像这样订阅,您会看到相同的输出:
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var cancellables = Set<AnyCancellable>()
users
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
原因是您 subscribe
在后台线程上,因此订阅和所有内容都是在主线程之外异步设置的,这导致 print("run")
到 运行 在接收之前Future
的结果。但是,一旦 users
属性 被访问(在主线程上),主线程仍然会休眠 5 秒,因为那是您初始化 Future
的时候。所以整个输出是一次打印出来的,而不是在 "run"
.
之后有 5 秒的睡眠
有一种方便的方法可以达到预期效果。 Combine 有一个 Deferred 发布者等待 subscribe(on:)
成为接收者。所以代码应该是这样的
var fetchUsers: Future<[User], Error> {
return Deferred {
Future { promise in
sleep(5)
promise(.success([User(name: "John"), User(name: "Jack")]))
}
}.eraseToAnyPublisher()
}
var cancellables = Set<AnyCancellable>()
fetchUsers
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
print("run")
这样的代码不会停止主队列,输出会如预期的那样
run
[User(name: "John"), User(name: "Jack")]
finished
如果你 运行 在 Playground 上这样做:
import Combine
import Foundation
struct User {
let name: String
}
var didAlreadyImportUsers = false
var importUsers: Future<Bool, Never> {
Future { promise in
sleep(5)
promise(.success(true))
}
}
var fetchUsers: Future<[User], Error> {
Future { promise in
promise(.success([User(name: "John"), User(name: "Jack")]))
}
}
var users: AnyPublisher<[User], Error> {
if didAlreadyImportUsers {
return fetchUsers
.receive(on: DispatchQueue.global(qos: .userInitiated))
.eraseToAnyPublisher()
} else {
return importUsers
.receive(on: DispatchQueue.global(qos: .userInitiated))
.setFailureType(to: Error.self)
.combineLatest(fetchUsers)
.map { [=10=].1 }
.eraseToAnyPublisher()
}
}
users
.receive(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
})
print("run")
输出将是:
[User(name: "John"), User(name: "Jack")]
run
finished
但我希望得到:
run
[User(name: "John"), User(name: "Jack")]
finished
因为接收器应该运行后台线程中的代码。我在这里缺少什么。 需要刷代码吗:
sleep(5)
promise(.success(true))
在后台线程中?那么
的目的是什么.receive(on: DispatchQueue.global(qos: .userInitiated))
你的 Future 运行 一经创建,所以在你的情况下,一旦这个 属性 被访问:
var importUsers: Future<Bool, Never> {
Future { promise in
sleep(5)
promise(.success(true))
}
}
并且由于 Future
运行 立即执行,这意味着传递给 promise 的闭包会立即执行,使主线程在继续之前休眠 5 秒。在您的情况下,一旦您访问在主线程上完成的 users
,就会创建 Future
。
receive(on:
影响 sink
(或下游发布者)接收值的线程,而不是创建值的线程。由于在您调用 .sink
时期货已经完成,完成和发出的值会立即传送到 sink
。在后台队列中,但仍然立即。
之后,您终于到达了 print("run")
行。
如果将 sleep(5)
位替换为:
var importUsers: Future<Bool, Never> {
Future { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + 5) {
promise(.success(true))
}
}
}
并对您的订阅代码进行一些小调整:
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var cancellables = Set<AnyCancellable>()
users
.receive(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
您会看到输出按预期打印,因为初始 future 不会阻塞主线程五秒钟。
或者,如果您继续睡眠并像这样订阅,您会看到相同的输出:
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var cancellables = Set<AnyCancellable>()
users
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
原因是您 subscribe
在后台线程上,因此订阅和所有内容都是在主线程之外异步设置的,这导致 print("run")
到 运行 在接收之前Future
的结果。但是,一旦 users
属性 被访问(在主线程上),主线程仍然会休眠 5 秒,因为那是您初始化 Future
的时候。所以整个输出是一次打印出来的,而不是在 "run"
.
有一种方便的方法可以达到预期效果。 Combine 有一个 Deferred 发布者等待 subscribe(on:)
成为接收者。所以代码应该是这样的
var fetchUsers: Future<[User], Error> {
return Deferred {
Future { promise in
sleep(5)
promise(.success([User(name: "John"), User(name: "Jack")]))
}
}.eraseToAnyPublisher()
}
var cancellables = Set<AnyCancellable>()
fetchUsers
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
print("run")
这样的代码不会停止主队列,输出会如预期的那样
run
[User(name: "John"), User(name: "Jack")]
finished