Swift Combine,如何取消执行
Swift Combine, how to cancel the execution
我是 Combine 的新手,我正在尝试通过解决“老”问题来理解它
我的目标是使进程可取消,但即使我在 printFizzbuzz()
方法之后(或带有排序延迟)调用了 .cancel()
方法,代码仍然保持 运行 (大约 3 秒)直到完成
我在新的Xcode项目中试过下面的代码,还是一样
import Foundation
import Combine
enum PrinterError: Error {
case indexError(String)
case subscriptionError(String)
var description: String {
switch self {
case .indexError(let descpription):
return descpription
case .subscriptionError(let description):
return description
}
}
}
struct FizzbuzzPrinter {
private var subscriptions = Set<AnyCancellable>()
mutating func printFizzbuzz(fromIndex: Int, toIndex: Int, handler: @escaping (_ result: Result<Int,PrinterError>) -> Void) {
guard toIndex > fromIndex else {
handler(.failure(.indexError("toIndex must larger than fromIndex")))
return
}
var currentIndex: Int = fromIndex
Array<Int>(fromIndex ..< toIndex).publisher
.handleEvents(receiveOutput: { index in
currentIndex = index
}, receiveCancel: {
handler(.failure(.subscriptionError("cancaled at \(currentIndex)")))
})
.map { number -> String in
switch (number.isMultiple(of: 3), number.isMultiple(of: 5) ) {
case (true, true):
return "fizzbuzz at \(number)"
case (true, false):
return "fizz at \(number)"
case (false, true):
return "buzz at \(number)"
case (false, false):
return String()
}
}
.filter{ ![=11=].isEmpty }
.sink { _ in
handler(.success(currentIndex))
} receiveValue: { print([=11=])}
.store(in: &subscriptions)
}
mutating func cancelAll() {
subscriptions.forEach{ [=11=].cancel()}
}
}
var fizzBuzzPrinter = FizzbuzzPrinter()
DispatchQueue.main.async {
fizzBuzzPrinter.printFizzbuzz(fromIndex: 1, toIndex: 60001) { result in
switch result {
case .failure(let printerError):
print(printerError.description)
case .success(let finishedIndex):
print("finished at \(finishedIndex)")
}
}
}
DispatchQueue.main.async {
fizzBuzzPrinter.cancelAll()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
fizzBuzzPrinter.cancelAll()
}
代码打印(最后 10 行):
fizz at 59982
fizzbuzz at 59985
fizz at 59988
buzz at 59990
fizz at 59991
fizz at 59994
buzz at 59995
fizz at 59997
fizzbuzz at 60000
finished at 60000
我也试过使用.switchToLatest()操作符,还是无法取消
struct FizzbuzzPrinter {
private var subscriptions = Set<AnyCancellable>()
private let publishers = PassthroughSubject<AnyPublisher<Int, Never>, Never>()
private let finishPublisher = PassthroughSubject<Int,Never>()
mutating func printFizzbuzz(fromIndex: Int, toIndex: Int, handler: @escaping (_ result: Result<Int,PrinterError>) -> Void) {
guard toIndex > fromIndex else {
handler(.failure(.indexError("toIndex must larger than fromIndex")))
return
}
var currentIndex: Int = fromIndex
publishers
.switchToLatest()
.handleEvents(receiveOutput: { index in
currentIndex = index
}, receiveCancel: {
handler(.failure(.subscriptionError("cancaled at \(currentIndex)")))
})
.map { number -> String in
switch (number.isMultiple(of: 3), number.isMultiple(of: 5) ) {
case (true, true):
return "fizzbuzz at \(number)"
case (true, false):
return "fizz at \(number)"
case (false, true):
return "buzz at \(number)"
case (false, false):
return String()
}
}
.filter{ ![=13=].isEmpty }
.sink { _ in
handler(.success(currentIndex))
} receiveValue: { print([=13=])}
.store(in: &subscriptions)
publishers.send(Array<Int>(fromIndex ..< toIndex)
.publisher
.eraseToAnyPublisher())
}
mutating func cancel() {
publishers.send(finishPublisher.eraseToAnyPublisher())
finishPublisher.send(completion: .finished)
}
}
var fizzBuzzPrinter = FizzbuzzPrinter()
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
fizzBuzzPrinter.cancel()
}
fizzBuzzPrinter.printFizzbuzz(fromIndex: 1, toIndex: 60001) { result in
switch result {
case .failure(let printerError):
print(printerError.description)
case .success(let finishedIndex):
print("finished at \(finishedIndex)")
}
}
我觉得我哪里弄错了,但我想不通。
++++++++++++++++++更新:++++++++++++++++++
谢谢@matt
我将发布者更改为基于计时器的发布者,现在可以使用了
Timer.publish(every: 0.1, on: .main, in: .common)
.autoconnect()
.scan(fromIndex) { current, _ in
current + 1
}
.prefix(toIndex - fromIndex)
++++++++++++++++++更新:2 +++++++++++++++++++
基于@matt 评论
.subscribe(on: DispatchQueue(label: "serial queue"))
和@matt Link:link最后一节“施加背压”
.flatMap(maxPublishers: .max(1)){ num in
Just(num).delay(for: .seconds(0.01), scheduler: DispatchQueue.main)
}
两者都有效
谢谢!!
++++++++++++++++++更新:3 +++++++++++++++++++
我修改了我以前的代码,使其变得更通用,并发现使用
.subscribe(on: DispatchQueue.global(qos: .background))
和
.receive(on: DispatchQueue.main)
最适合我,速度不受硬编码的时间间隔限制,不会互相阻塞,可单独取消
我调用了下面的代码(不包括一些不相关的逻辑)
typealias CancelableIntTaskType = CancelableTask<AnyPublisher<Int, Never>>
var cancelableIntTasks = CancelableIntTaskType()
var taskIdArray = [UUID?]()
taskIdArray = [
startFizzBuzzTask(50, -80, &cancelableIntTasks),
startFizzBuzzTask(100, 200, &cancelableIntTasks),
startFizzBuzzTask(600, 1000, &cancelableIntTasks),
startFizzBuzzTask(2000, 2600, &cancelableIntTasks)
]
checkAllTasksStarted(taskIdArray)
DispatchQueue.main.asyncAfter(deadline: .now() + 0.0001) {
if let id = taskIdArray.compactMap({[=19=]}).first {
cancelableIntTasks.cancelTaskWithID(id)
}
}
DispatchQueue.main.asyncAfter(deadline: .now() + 0.0002) {
cancelableIntTasks.cancelAll()
}
产出
*id 52F started
id:52F val: 100 progress 0.0%
id:52F val: 101 progress 1.0%
id:52F val: 102 progress 2.0%
id:52F val: 103 progress 3.0%
id:52F val: 104 progress 4.0%
id:52F val: 105 progress 5.0%
id:52F val: 106 progress 6.0%
id:52F val: 107 progress 8.0%
id:52F val: 108 progress 8.0%
id:52F val: 109 progress 9.0%
id:52F val: 110 progress 10.0%
id:52F val: 111 progress 11.0%
id:52F val: 112 progress 12.0%
*id 9EA started
id:52F val: 113 progress 13.0%
id:52F val: 114 progress 15.0%
id:52F val: 115 progress 15.0%
*id 97B started
id:52F val: 116 progress 16.0%
id:52F val: 117 progress 17.0%
id:52F val: 118 progress 18.0%
id:9EA val: 600 progress 0.0%
id:9EA val: 601 progress 1.0%
id:97B val: 2000 progress 0.0%
id:9EA val: 602 progress 1.0%
id:9EA val: 603 progress 1.0%
id:9EA val: 604 progress 1.0%
id:97B val: 2001 progress 1.0%
id:9EA val: 605 progress 2.0%
id:97B val: 2002 progress 1.0%
id:52F val: 119 progress 19.0%
id:9EA val: 606 progress 2.0%
4 tasks initiated 3 started
task at index 0 failed
id:97B val: 2003 progress 1.0%
id:52F val: 120 progress 20.0%
id:9EA val: 607 progress 2.0%
id:97B val: 2004 progress 1.0%
id:9EA val: 608 progress 2.0%
id:52F val: 121 progress 21.0%
id:97B val: 2005 progress 1.0%
id:9EA val: 609 progress 3.0%
id:97B val: 2006 progress 1.0%
id:9EA val: 610 progress 3.0%
id:97B val: 2007 progress 2.0%
id:9EA val: 611 progress 3.0%
id:97B val: 2008 progress 2.0%
id:9EA val: 612 progress 3.0%
id:97B val: 2009 progress 2.0%
id:9EA val: 613 progress 4.0%
id:97B val: 2010 progress 2.0%
id:9EA val: 614 progress 4.0%
id:97B val: 2011 progress 2.0%
id:52F val: 122 progress 22.0%
id:9EA val: 615 progress 4.0%
id:52F val: 123 progress 23.0%
id:9EA val: 616 progress 4.0%
id:52F val: 124 progress 24.0%
id:9EA val: 617 progress 5.0%
id:52F val: 125 progress 25.0%
id:97B val: 2012 progress 2.0%
id:9EA val: 618 progress 5.0%
**52F canceled at 125 progress: 25.0%
**97B canceled at 2012 progress: 2.0%
**9EA canceled at 619 progress: 5.0%
id:9EA val: 619 progress 5.0%
id:52F val: 126 progress 26.0%
+++++++++++++++++++更新 4+++++++++++++++++++
刚刚制作了一个学习取消/恢复任务方法的项目
问题是你的publisher做的太粗糙了:不是异步的。数组发布者只是一次发布它的所有值,所以你取消得太晚了;你正在阻塞主线程。改用计时器发布器之类的东西,或者使用带有延迟和背压的平面图。
我是 Combine 的新手,我正在尝试通过解决“老”问题来理解它
我的目标是使进程可取消,但即使我在 printFizzbuzz()
方法之后(或带有排序延迟)调用了 .cancel()
方法,代码仍然保持 运行 (大约 3 秒)直到完成
我在新的Xcode项目中试过下面的代码,还是一样
import Foundation
import Combine
enum PrinterError: Error {
case indexError(String)
case subscriptionError(String)
var description: String {
switch self {
case .indexError(let descpription):
return descpription
case .subscriptionError(let description):
return description
}
}
}
struct FizzbuzzPrinter {
private var subscriptions = Set<AnyCancellable>()
mutating func printFizzbuzz(fromIndex: Int, toIndex: Int, handler: @escaping (_ result: Result<Int,PrinterError>) -> Void) {
guard toIndex > fromIndex else {
handler(.failure(.indexError("toIndex must larger than fromIndex")))
return
}
var currentIndex: Int = fromIndex
Array<Int>(fromIndex ..< toIndex).publisher
.handleEvents(receiveOutput: { index in
currentIndex = index
}, receiveCancel: {
handler(.failure(.subscriptionError("cancaled at \(currentIndex)")))
})
.map { number -> String in
switch (number.isMultiple(of: 3), number.isMultiple(of: 5) ) {
case (true, true):
return "fizzbuzz at \(number)"
case (true, false):
return "fizz at \(number)"
case (false, true):
return "buzz at \(number)"
case (false, false):
return String()
}
}
.filter{ ![=11=].isEmpty }
.sink { _ in
handler(.success(currentIndex))
} receiveValue: { print([=11=])}
.store(in: &subscriptions)
}
mutating func cancelAll() {
subscriptions.forEach{ [=11=].cancel()}
}
}
var fizzBuzzPrinter = FizzbuzzPrinter()
DispatchQueue.main.async {
fizzBuzzPrinter.printFizzbuzz(fromIndex: 1, toIndex: 60001) { result in
switch result {
case .failure(let printerError):
print(printerError.description)
case .success(let finishedIndex):
print("finished at \(finishedIndex)")
}
}
}
DispatchQueue.main.async {
fizzBuzzPrinter.cancelAll()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
fizzBuzzPrinter.cancelAll()
}
代码打印(最后 10 行):
fizz at 59982
fizzbuzz at 59985
fizz at 59988
buzz at 59990
fizz at 59991
fizz at 59994
buzz at 59995
fizz at 59997
fizzbuzz at 60000
finished at 60000
我也试过使用.switchToLatest()操作符,还是无法取消
struct FizzbuzzPrinter {
private var subscriptions = Set<AnyCancellable>()
private let publishers = PassthroughSubject<AnyPublisher<Int, Never>, Never>()
private let finishPublisher = PassthroughSubject<Int,Never>()
mutating func printFizzbuzz(fromIndex: Int, toIndex: Int, handler: @escaping (_ result: Result<Int,PrinterError>) -> Void) {
guard toIndex > fromIndex else {
handler(.failure(.indexError("toIndex must larger than fromIndex")))
return
}
var currentIndex: Int = fromIndex
publishers
.switchToLatest()
.handleEvents(receiveOutput: { index in
currentIndex = index
}, receiveCancel: {
handler(.failure(.subscriptionError("cancaled at \(currentIndex)")))
})
.map { number -> String in
switch (number.isMultiple(of: 3), number.isMultiple(of: 5) ) {
case (true, true):
return "fizzbuzz at \(number)"
case (true, false):
return "fizz at \(number)"
case (false, true):
return "buzz at \(number)"
case (false, false):
return String()
}
}
.filter{ ![=13=].isEmpty }
.sink { _ in
handler(.success(currentIndex))
} receiveValue: { print([=13=])}
.store(in: &subscriptions)
publishers.send(Array<Int>(fromIndex ..< toIndex)
.publisher
.eraseToAnyPublisher())
}
mutating func cancel() {
publishers.send(finishPublisher.eraseToAnyPublisher())
finishPublisher.send(completion: .finished)
}
}
var fizzBuzzPrinter = FizzbuzzPrinter()
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
fizzBuzzPrinter.cancel()
}
fizzBuzzPrinter.printFizzbuzz(fromIndex: 1, toIndex: 60001) { result in
switch result {
case .failure(let printerError):
print(printerError.description)
case .success(let finishedIndex):
print("finished at \(finishedIndex)")
}
}
我觉得我哪里弄错了,但我想不通。
++++++++++++++++++更新:++++++++++++++++++
谢谢@matt 我将发布者更改为基于计时器的发布者,现在可以使用了
Timer.publish(every: 0.1, on: .main, in: .common)
.autoconnect()
.scan(fromIndex) { current, _ in
current + 1
}
.prefix(toIndex - fromIndex)
++++++++++++++++++更新:2 +++++++++++++++++++
基于@matt 评论
.subscribe(on: DispatchQueue(label: "serial queue"))
和@matt Link:link最后一节“施加背压”
.flatMap(maxPublishers: .max(1)){ num in
Just(num).delay(for: .seconds(0.01), scheduler: DispatchQueue.main)
}
两者都有效
谢谢!!
++++++++++++++++++更新:3 +++++++++++++++++++
我修改了我以前的代码,使其变得更通用,并发现使用
.subscribe(on: DispatchQueue.global(qos: .background))
和
.receive(on: DispatchQueue.main)
最适合我,速度不受硬编码的时间间隔限制,不会互相阻塞,可单独取消
我调用了下面的代码(不包括一些不相关的逻辑)
typealias CancelableIntTaskType = CancelableTask<AnyPublisher<Int, Never>>
var cancelableIntTasks = CancelableIntTaskType()
var taskIdArray = [UUID?]()
taskIdArray = [
startFizzBuzzTask(50, -80, &cancelableIntTasks),
startFizzBuzzTask(100, 200, &cancelableIntTasks),
startFizzBuzzTask(600, 1000, &cancelableIntTasks),
startFizzBuzzTask(2000, 2600, &cancelableIntTasks)
]
checkAllTasksStarted(taskIdArray)
DispatchQueue.main.asyncAfter(deadline: .now() + 0.0001) {
if let id = taskIdArray.compactMap({[=19=]}).first {
cancelableIntTasks.cancelTaskWithID(id)
}
}
DispatchQueue.main.asyncAfter(deadline: .now() + 0.0002) {
cancelableIntTasks.cancelAll()
}
产出
*id 52F started
id:52F val: 100 progress 0.0%
id:52F val: 101 progress 1.0%
id:52F val: 102 progress 2.0%
id:52F val: 103 progress 3.0%
id:52F val: 104 progress 4.0%
id:52F val: 105 progress 5.0%
id:52F val: 106 progress 6.0%
id:52F val: 107 progress 8.0%
id:52F val: 108 progress 8.0%
id:52F val: 109 progress 9.0%
id:52F val: 110 progress 10.0%
id:52F val: 111 progress 11.0%
id:52F val: 112 progress 12.0%
*id 9EA started
id:52F val: 113 progress 13.0%
id:52F val: 114 progress 15.0%
id:52F val: 115 progress 15.0%
*id 97B started
id:52F val: 116 progress 16.0%
id:52F val: 117 progress 17.0%
id:52F val: 118 progress 18.0%
id:9EA val: 600 progress 0.0%
id:9EA val: 601 progress 1.0%
id:97B val: 2000 progress 0.0%
id:9EA val: 602 progress 1.0%
id:9EA val: 603 progress 1.0%
id:9EA val: 604 progress 1.0%
id:97B val: 2001 progress 1.0%
id:9EA val: 605 progress 2.0%
id:97B val: 2002 progress 1.0%
id:52F val: 119 progress 19.0%
id:9EA val: 606 progress 2.0%
4 tasks initiated 3 started
task at index 0 failed
id:97B val: 2003 progress 1.0%
id:52F val: 120 progress 20.0%
id:9EA val: 607 progress 2.0%
id:97B val: 2004 progress 1.0%
id:9EA val: 608 progress 2.0%
id:52F val: 121 progress 21.0%
id:97B val: 2005 progress 1.0%
id:9EA val: 609 progress 3.0%
id:97B val: 2006 progress 1.0%
id:9EA val: 610 progress 3.0%
id:97B val: 2007 progress 2.0%
id:9EA val: 611 progress 3.0%
id:97B val: 2008 progress 2.0%
id:9EA val: 612 progress 3.0%
id:97B val: 2009 progress 2.0%
id:9EA val: 613 progress 4.0%
id:97B val: 2010 progress 2.0%
id:9EA val: 614 progress 4.0%
id:97B val: 2011 progress 2.0%
id:52F val: 122 progress 22.0%
id:9EA val: 615 progress 4.0%
id:52F val: 123 progress 23.0%
id:9EA val: 616 progress 4.0%
id:52F val: 124 progress 24.0%
id:9EA val: 617 progress 5.0%
id:52F val: 125 progress 25.0%
id:97B val: 2012 progress 2.0%
id:9EA val: 618 progress 5.0%
**52F canceled at 125 progress: 25.0%
**97B canceled at 2012 progress: 2.0%
**9EA canceled at 619 progress: 5.0%
id:9EA val: 619 progress 5.0%
id:52F val: 126 progress 26.0%
+++++++++++++++++++更新 4+++++++++++++++++++
刚刚制作了一个学习取消/恢复任务方法的项目
问题是你的publisher做的太粗糙了:不是异步的。数组发布者只是一次发布它的所有值,所以你取消得太晚了;你正在阻塞主线程。改用计时器发布器之类的东西,或者使用带有延迟和背压的平面图。