如何使用 Combine buffer operator 施加背压来避免 flatMap 向上游请求无限需求?
How to apply back pressure with Combine buffer operator to avoid flatMap to ask an infinite demand upstream?
我正在尝试使用 Combine 通过网络执行数百万个并发请求。这是我使用的幼稚方法的模型:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.map(some_preprocessing)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool
sleep(100)
此管道首先创建一个请求,该请求在 flatMap
中完成以限制错误。此外,flatMap
合并了多个请求,使它们有效地同时完成,这很棒。
问题是它实际上会同时发出 1,000,000 个请求,所以我添加了参数 maxPublishers
来限制 flatMap
中同时订阅的发布者数量。这种工作,只有32个publisher同时活跃,可惜some_preprocessing
还是会执行1,000,000次才flatMap
执行
我希望 flatMap(maxPublishers: .max(32))
施加一些背压,即仅在 maxPublishers
< 32 时从上游发布者 map
请求项目。这似乎不是这种情况,并且它会迅速填满 RAM 并延迟处理。
然后我尝试使用 buffer
运算符来在生产者和消费者之间引入背压,但是 Apple 文档太差了我不明白它的功能(更具体地说 prefechStrategy
参数).
所以我尝试了不同的组合,例如:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.map(some_preprocessing)
.buffer(size: 32, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool
sleep(100)
尽管这似乎没有任何用处,flatMap
仍然请求尽可能多的元素。
这种情况下如何正确施加背压?也就是说,我需要上游 map
发布者“等待”下游发布者 flatMap
提出的需求,它应该只在空位时询问项目。
正如 所指出的,该问题似乎是一个 Combine 错误。使用 Publishers.Sequence
会导致以下运算符在继续之前累积发送到下游的每个值。
解决方法是对序列发布者进行类型擦除:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.eraseToAnyPublisher() // <----
.map(some_preprocessing)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool without running loop
sleep(.max)
我正在尝试使用 Combine 通过网络执行数百万个并发请求。这是我使用的幼稚方法的模型:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.map(some_preprocessing)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool
sleep(100)
此管道首先创建一个请求,该请求在 flatMap
中完成以限制错误。此外,flatMap
合并了多个请求,使它们有效地同时完成,这很棒。
问题是它实际上会同时发出 1,000,000 个请求,所以我添加了参数 maxPublishers
来限制 flatMap
中同时订阅的发布者数量。这种工作,只有32个publisher同时活跃,可惜some_preprocessing
还是会执行1,000,000次才flatMap
执行
我希望 flatMap(maxPublishers: .max(32))
施加一些背压,即仅在 maxPublishers
< 32 时从上游发布者 map
请求项目。这似乎不是这种情况,并且它会迅速填满 RAM 并延迟处理。
然后我尝试使用 buffer
运算符来在生产者和消费者之间引入背压,但是 Apple 文档太差了我不明白它的功能(更具体地说 prefechStrategy
参数).
所以我尝试了不同的组合,例如:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.map(some_preprocessing)
.buffer(size: 32, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool
sleep(100)
尽管这似乎没有任何用处,flatMap
仍然请求尽可能多的元素。
这种情况下如何正确施加背压?也就是说,我需要上游 map
发布者“等待”下游发布者 flatMap
提出的需求,它应该只在空位时询问项目。
正如 Publishers.Sequence
会导致以下运算符在继续之前累积发送到下游的每个值。
解决方法是对序列发布者进行类型擦除:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.eraseToAnyPublisher() // <----
.map(some_preprocessing)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool without running loop
sleep(.max)