在后台队列上订阅时 Combine Publisher 不会取消
Combine Publisher does not cancel when subscribed to on a background queue
我正在尝试利用 Combine 的能力 subscribe
到不同队列上的上游发布者,但我发现当我这样做时,上游发布者没有被正确取消。
subscribe(on:options:) 的文档进行了以下引用:
Using subscribe(on:options:) also causes the upstream publisher to
perform cancel() using the specified scheduler.
我不完全清楚这句话的含义。在下面的示例代码中,我有一个符合 Sequence
和 IteratorProtocol
的基本类型 NameIterator
。它只是模拟一个自定义迭代器,可能需要很长时间才能获取元素。
Combine 将 .publisher
属性 添加到 Sequence
。如果您在单个线程中订阅该发布者,那么任何取消事件都会正确传播到根发布者并且迭代停止。
但是,如果您调用 subscribe(on:)
,则在耗尽对迭代器的调用之前,取消事件不会到达根发布者。请注意,sink
在这两种情况下都被正确调用,但迭代器会继续被调用,即使没有值被推入管道。
如果迭代器很昂贵并且包含大量项目,这会导致问题,例如遍历文件系统。有没有办法正确取消上游发布者?或者我可能错误地配置了发布者链。
样本迭代器:
struct NameIterator: Sequence, IteratorProtocol {
private let names = ["Alpha", "Beta", "Charlie", "Delta"]
private var index = 0
mutating func next() -> String? {
guard index < names.endIndex else {
return nil
}
// Simulate an expensive iterator.
sleep(1)
defer { index = names.index(after: index) }
print("NameIterator.next() called with \(names[index])")
return names[index]
}
}
正确取消:
let cancellable = NameIterator().publisher
.first()
.sink { print("Sink: \([=11=])") }
// Outputs:
NameIterator.next() called with Alpha
NameIterator.next() called with Beta
Sink: Alpha
错误取消:
let cancellable = NameIterator().publisher
.subscribe(on: DispatchQueue(label: "BackgroundQueue"))
.first()
.sink { print("Sink: \([=12=])") }
// Outputs:
NameIterator.next() called with Alpha
NameIterator.next() called with Beta
Sink: Alpha
NameIterator.next() called with Charlie // This call was unexpected.
NameIterator.next() called with Delta // This call was unexpected.
文档中的引用不是关于取消的声明,而是关于线程的声明。它说:如果您订阅某个队列,那么这就是将要使用的队列 when/if 是时候将取消消息发送到管道上了。
通过选择订阅指定队列,您就是在明确地说:当需要取消时,将该调用排队。因此,对于队列上的任何操作,我们现在不知道什么时候会真正发生。因此,“这个电话不应该发生”的说法是错误的;故事中没有“应该有”。相反,任何关于取消何时会渗透到发布者的期望正是您在队列中订阅时放弃的。
(顺便说一下,Completion 在预期的时刻以良好的顺序到达 管道——也就是说,sink 获得 Alpha 值,紧接着是.finished
完成。只有 发布者 您才给予了所有这些额外的回旋余地。)
我正在尝试利用 Combine 的能力 subscribe
到不同队列上的上游发布者,但我发现当我这样做时,上游发布者没有被正确取消。
subscribe(on:options:) 的文档进行了以下引用:
Using subscribe(on:options:) also causes the upstream publisher to perform cancel() using the specified scheduler.
我不完全清楚这句话的含义。在下面的示例代码中,我有一个符合 Sequence
和 IteratorProtocol
的基本类型 NameIterator
。它只是模拟一个自定义迭代器,可能需要很长时间才能获取元素。
Combine 将 .publisher
属性 添加到 Sequence
。如果您在单个线程中订阅该发布者,那么任何取消事件都会正确传播到根发布者并且迭代停止。
但是,如果您调用 subscribe(on:)
,则在耗尽对迭代器的调用之前,取消事件不会到达根发布者。请注意,sink
在这两种情况下都被正确调用,但迭代器会继续被调用,即使没有值被推入管道。
如果迭代器很昂贵并且包含大量项目,这会导致问题,例如遍历文件系统。有没有办法正确取消上游发布者?或者我可能错误地配置了发布者链。
样本迭代器:
struct NameIterator: Sequence, IteratorProtocol {
private let names = ["Alpha", "Beta", "Charlie", "Delta"]
private var index = 0
mutating func next() -> String? {
guard index < names.endIndex else {
return nil
}
// Simulate an expensive iterator.
sleep(1)
defer { index = names.index(after: index) }
print("NameIterator.next() called with \(names[index])")
return names[index]
}
}
正确取消:
let cancellable = NameIterator().publisher
.first()
.sink { print("Sink: \([=11=])") }
// Outputs:
NameIterator.next() called with Alpha
NameIterator.next() called with Beta
Sink: Alpha
错误取消:
let cancellable = NameIterator().publisher
.subscribe(on: DispatchQueue(label: "BackgroundQueue"))
.first()
.sink { print("Sink: \([=12=])") }
// Outputs:
NameIterator.next() called with Alpha
NameIterator.next() called with Beta
Sink: Alpha
NameIterator.next() called with Charlie // This call was unexpected.
NameIterator.next() called with Delta // This call was unexpected.
文档中的引用不是关于取消的声明,而是关于线程的声明。它说:如果您订阅某个队列,那么这就是将要使用的队列 when/if 是时候将取消消息发送到管道上了。
通过选择订阅指定队列,您就是在明确地说:当需要取消时,将该调用排队。因此,对于队列上的任何操作,我们现在不知道什么时候会真正发生。因此,“这个电话不应该发生”的说法是错误的;故事中没有“应该有”。相反,任何关于取消何时会渗透到发布者的期望正是您在队列中订阅时放弃的。
(顺便说一下,Completion 在预期的时刻以良好的顺序到达 管道——也就是说,sink 获得 Alpha 值,紧接着是.finished
完成。只有 发布者 您才给予了所有这些额外的回旋余地。)