addTask 和 addTaskUnlessCancelled 之间的结构化并发差异 Swift
Structured Concurrency difference between addTask and addTaskUnlessCancelled Swift
我对调用 addTask()
和 addTaskUnlessCancelled
感到困惑。根据定义 addTask()
在您的组中将无条件地向 group
添加一个新任务
func testCancellation() async {
do {
try await withThrowingTaskGroup(of: Void.self) { group -> Void in
group.addTaskUnlessCancelled {
print("added")
try await Task.sleep(nanoseconds: 1_000_000_000)
throw ExampleError.badURL
}
group.addTaskUnlessCancelled {
print("added")
try await Task.sleep(nanoseconds: 2_000_000_000)
print("Task is cancelled: \(Task.isCancelled)")
}
group.addTaskUnlessCancelled {
print("added")
try await Task.sleep(nanoseconds: 5_000_000_000)
print("Task is cancelled: \(Task.isCancelled)")
}
group.cancelAll()
try await group.next()
}
} catch {
print("Error thrown: \(error.localizedDescription)")
}
}
如果要避免将任务添加到已取消的组中,我们必须使用addTaskUnlessCancelled()
方法。但即使使用 group.cancelAll()
,它也会将所有任务添加到组中。那么这里和 returning 值有什么区别 return true
仅当 Task
抛出一些错误?
简答
addTaskUnlessCancelled
的优点在于:
- 如果群被取消了,任务也不会开始;和
- 如果组已取消,它允许您添加提前退出。
如果您有兴趣,可以比较 addTask
with that of addTaskUnlessCancelled
的源代码,就在它的正下方。
长答案
我认为如果我们改变任务组随着时间的推移处理请求(例如消耗 AsyncSequence
),这个问题是最好的例证。
考虑以下因素
func a() async {
await withTaskGroup(of: Void.self) { group in
for await i in tickSequence() {
group.addTask(operation: { await self.b() })
if i == 3 {
group.cancelAll()
}
}
}
}
func b() async {
let start = CACurrentMediaTime()
while CACurrentMediaTime() - start < 3 { }
}
func tickSequence() -> AsyncStream<Int> {
AsyncStream { continuation in
Task {
for i in 0 ..< 12 {
try await Task.sleep(nanoseconds: NSEC_PER_SEC / 2)
continuation.yield(i)
}
continuation.finish()
}
}
}
在此示例中,b
正在等待滴答序列事件,为每个事件调用 c
。但是我们在添加第四个任务后取消了该组。结果如下:
所以我们可以看到路标ⓢ,在添加第四个任务后,团被取消了,但是没有受到影响。
但那是因为 b
没有响应取消。那么假设你解决了这个问题:
func b() async {
let start = CACurrentMediaTime()
while CACurrentMediaTime() - start < 3 {
if Task.isCancelled { return }
}
}
您现在看到如下行为:
更好,因为 b
现在正在取消,但 a
仍在尝试向群组添加任务,即使它已被取消。并且你可以看到 b
重复 运行 (虽然至少它现在立即终止)并且 a
启动的任务没有及时完成。
但是,如果你使用group.addTaskUnlessCancelled
,它不仅根本不会向组中添加新任务(即,它不依赖于任务的取消功能),而且还会让你退出取消群时。
func a() async {
await withTaskGroup(of: Void.self) { group in
for await i in tickSequence() {
guard group.addTaskUnlessCancelled(operation: { await self.b() })
else { break }
if i == 3 {
group.cancelAll()
}
}
}
}
这会产生所需的行为:
显然,这是一个相当做作的示例,明确构造以说明差异,但在许多情况下,addTask
和 addTaskUnlessCancelled
之间的差异不那么明显。但希望以上说明了差异。
最重要的是,如果您可能在向该组添加其他任务的过程中取消组,建议 addTaskUnlessCancelled
。话虽如此,如果您不取消群组(例如,取消任务比取消群组更常见,恕我直言),尚不完全清楚真正需要多久 addTaskUnlessCancelled
。
请注意,在上面,我删除了所有 OSLog
和路标代码以在 Xcode 仪器中生成上述所有间隔和路标(因为我不想分散手头的问题)。但是,如果您有兴趣,这就是实际代码:
import os.log
private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)
func a() async {
await withTaskGroup(of: Void.self) { group in
let id = log.begin(name: #function, "begin")
defer { log.end(name: #function, "end", id: id) }
for await i in tickSequence() {
guard group.addTaskUnlessCancelled(operation: { await self.b() }) else { break }
if i == 3 {
log.event(name: #function, "Cancel")
group.cancelAll()
}
}
}
}
func b() async {
let id = log.begin(name: #function, "begin")
defer { log.end(name: #function, "end", id: id) }
let start = CACurrentMediaTime()
while CACurrentMediaTime() - start < 3 {
if Task.isCancelled { return }
}
}
func tickSequence() -> AsyncStream<Int> {
AsyncStream { continuation in
Task {
for i in 0 ..< 12 {
try await Task.sleep(nanoseconds: NSEC_PER_SEC / 2)
continuation.yield(i)
}
continuation.finish()
}
}
}
有
extension OSLog {
func event(name: StaticString = "Points", _ string: String) {
os_signpost(.event, log: self, name: name, "%{public}@", string)
}
/// Manually begin an interval
func begin(name: StaticString = "Intervals", _ string: String) -> OSSignpostID {
let id = OSSignpostID(log: self)
os_signpost(.begin, log: self, name: name, signpostID: id, "%{public}@", string)
return id
}
/// Manually end an interval
func end(name: StaticString = "Intervals", _ string: String, id: OSSignpostID) {
os_signpost(.end, log: self, name: name, signpostID: id, "%{public}@", string)
}
func interval<T>(name: StaticString = "Intervals", _ string: String, block: () throws -> T) rethrows -> T {
let id = OSSignpostID(log: self)
os_signpost(.begin, log: self, name: name, signpostID: id, "%{public}@", string)
defer { os_signpost(.end, log: self, name: name, signpostID: id, "%{public}@", string) }
return try block()
}
}
我对调用 addTask()
和 addTaskUnlessCancelled
感到困惑。根据定义 addTask()
在您的组中将无条件地向 group
func testCancellation() async {
do {
try await withThrowingTaskGroup(of: Void.self) { group -> Void in
group.addTaskUnlessCancelled {
print("added")
try await Task.sleep(nanoseconds: 1_000_000_000)
throw ExampleError.badURL
}
group.addTaskUnlessCancelled {
print("added")
try await Task.sleep(nanoseconds: 2_000_000_000)
print("Task is cancelled: \(Task.isCancelled)")
}
group.addTaskUnlessCancelled {
print("added")
try await Task.sleep(nanoseconds: 5_000_000_000)
print("Task is cancelled: \(Task.isCancelled)")
}
group.cancelAll()
try await group.next()
}
} catch {
print("Error thrown: \(error.localizedDescription)")
}
}
如果要避免将任务添加到已取消的组中,我们必须使用addTaskUnlessCancelled()
方法。但即使使用 group.cancelAll()
,它也会将所有任务添加到组中。那么这里和 returning 值有什么区别 return true
仅当 Task
抛出一些错误?
简答
addTaskUnlessCancelled
的优点在于:
- 如果群被取消了,任务也不会开始;和
- 如果组已取消,它允许您添加提前退出。
如果您有兴趣,可以比较 addTask
with that of addTaskUnlessCancelled
的源代码,就在它的正下方。
长答案
我认为如果我们改变任务组随着时间的推移处理请求(例如消耗 AsyncSequence
),这个问题是最好的例证。
考虑以下因素
func a() async {
await withTaskGroup(of: Void.self) { group in
for await i in tickSequence() {
group.addTask(operation: { await self.b() })
if i == 3 {
group.cancelAll()
}
}
}
}
func b() async {
let start = CACurrentMediaTime()
while CACurrentMediaTime() - start < 3 { }
}
func tickSequence() -> AsyncStream<Int> {
AsyncStream { continuation in
Task {
for i in 0 ..< 12 {
try await Task.sleep(nanoseconds: NSEC_PER_SEC / 2)
continuation.yield(i)
}
continuation.finish()
}
}
}
在此示例中,b
正在等待滴答序列事件,为每个事件调用 c
。但是我们在添加第四个任务后取消了该组。结果如下:
所以我们可以看到路标ⓢ,在添加第四个任务后,团被取消了,但是没有受到影响。
但那是因为 b
没有响应取消。那么假设你解决了这个问题:
func b() async {
let start = CACurrentMediaTime()
while CACurrentMediaTime() - start < 3 {
if Task.isCancelled { return }
}
}
您现在看到如下行为:
更好,因为 b
现在正在取消,但 a
仍在尝试向群组添加任务,即使它已被取消。并且你可以看到 b
重复 运行 (虽然至少它现在立即终止)并且 a
启动的任务没有及时完成。
但是,如果你使用group.addTaskUnlessCancelled
,它不仅根本不会向组中添加新任务(即,它不依赖于任务的取消功能),而且还会让你退出取消群时。
func a() async {
await withTaskGroup(of: Void.self) { group in
for await i in tickSequence() {
guard group.addTaskUnlessCancelled(operation: { await self.b() })
else { break }
if i == 3 {
group.cancelAll()
}
}
}
}
这会产生所需的行为:
显然,这是一个相当做作的示例,明确构造以说明差异,但在许多情况下,addTask
和 addTaskUnlessCancelled
之间的差异不那么明显。但希望以上说明了差异。
最重要的是,如果您可能在向该组添加其他任务的过程中取消组,建议 addTaskUnlessCancelled
。话虽如此,如果您不取消群组(例如,取消任务比取消群组更常见,恕我直言),尚不完全清楚真正需要多久 addTaskUnlessCancelled
。
请注意,在上面,我删除了所有 OSLog
和路标代码以在 Xcode 仪器中生成上述所有间隔和路标(因为我不想分散手头的问题)。但是,如果您有兴趣,这就是实际代码:
import os.log
private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)
func a() async {
await withTaskGroup(of: Void.self) { group in
let id = log.begin(name: #function, "begin")
defer { log.end(name: #function, "end", id: id) }
for await i in tickSequence() {
guard group.addTaskUnlessCancelled(operation: { await self.b() }) else { break }
if i == 3 {
log.event(name: #function, "Cancel")
group.cancelAll()
}
}
}
}
func b() async {
let id = log.begin(name: #function, "begin")
defer { log.end(name: #function, "end", id: id) }
let start = CACurrentMediaTime()
while CACurrentMediaTime() - start < 3 {
if Task.isCancelled { return }
}
}
func tickSequence() -> AsyncStream<Int> {
AsyncStream { continuation in
Task {
for i in 0 ..< 12 {
try await Task.sleep(nanoseconds: NSEC_PER_SEC / 2)
continuation.yield(i)
}
continuation.finish()
}
}
}
有
extension OSLog {
func event(name: StaticString = "Points", _ string: String) {
os_signpost(.event, log: self, name: name, "%{public}@", string)
}
/// Manually begin an interval
func begin(name: StaticString = "Intervals", _ string: String) -> OSSignpostID {
let id = OSSignpostID(log: self)
os_signpost(.begin, log: self, name: name, signpostID: id, "%{public}@", string)
return id
}
/// Manually end an interval
func end(name: StaticString = "Intervals", _ string: String, id: OSSignpostID) {
os_signpost(.end, log: self, name: name, signpostID: id, "%{public}@", string)
}
func interval<T>(name: StaticString = "Intervals", _ string: String, block: () throws -> T) rethrows -> T {
let id = OSSignpostID(log: self)
os_signpost(.begin, log: self, name: name, signpostID: id, "%{public}@", string)
defer { os_signpost(.end, log: self, name: name, signpostID: id, "%{public}@", string) }
return try block()
}
}