Swift Combine:如何从发布者列表中创建一个发布者?
Swift Combine: How to create a single publisher from a list of publishers?
我想使用 Apple 的新 Combine 框架从列表中的每个元素发出多个请求。然后我想要一个减少所有响应的单一结果。基本上我想从发布者列表转到拥有响应列表的单个发布者。
我试过制作一个发布商列表,但我不知道如何将该列表缩减为一个发布商。我试过制作一个包含列表的发布者,但我无法平面映射发布者列表。
请看"createIngredients"函数
func createIngredient(ingredient: Ingredient) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
return apollo.performPub(mutation: CreateIngredientMutation(name: ingredient.name, optionalProduct: ingredient.productId, quantity: ingredient.quantity, unit: ingredient.unit))
.eraseToAnyPublisher()
}
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
// first attempt
let results = ingredients
.map(createIngredient)
// results = [AnyPublisher<CreateIngredientMutation.Data, Error>]
// second attempt
return Publishers.Just(ingredients)
.eraseToAnyPublisher()
.flatMap { (list: [Ingredient]) -> Publisher<[CreateIngredientMutation.Data], Error> in
return list.map(createIngredient) // [AnyPublisher<CreateIngredientMutation.Data, Error>]
}
}
我不确定如何获取发布者数组并将其转换为包含数组的发布者。
类型“[AnyPublisher]”的结果值不符合闭包结果类型'Publisher'
本质上,在您的特定情况下,您会看到类似这样的内容:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
.collect()
.eraseToAnyPublisher()
}
这 'collects' 由上游发布者生成的所有元素,并且 - 一旦它们全部完成 - 生成一个包含所有结果的数组并最终完成自身。
请记住,如果其中一个上游发布者失败——或产生多个结果——元素的数量可能与订阅者的数量不匹配,因此您可能需要额外的运营商来缓解这种情况,具体取决于您的情况。
更通用的答案,您可以使用 EntwineTest framework:
对其进行测试
import XCTest
import Combine
import EntwineTest
final class MyTests: XCTestCase {
func testCreateArrayFromArrayOfPublishers() {
typealias SimplePublisher = Just<Int>
// we'll create our 'list of publishers' here. Each publisher emits a single
// Int and then completes successfully – using the `Just` publisher.
let publishers: [SimplePublisher] = [
SimplePublisher(1),
SimplePublisher(2),
SimplePublisher(3),
]
// we'll turn our array of publishers into a single merged publisher
let publisherOfPublishers = Publishers.MergeMany(publishers)
// Then we `collect` all the individual publisher elements results into
// a single array
let finalPublisher = publisherOfPublishers.collect()
// Let's test what we expect to happen, will happen.
// We'll create a scheduler to run our test on
let testScheduler = TestScheduler()
// Then we'll start a test. Our test will subscribe to our publisher
// at a virtual time of 200, and cancel the subscription at 900
let testableSubscriber = testScheduler.start { finalPublisher }
// we're expecting that, immediately upon subscription, our results will
// arrive. This is because we're using `just` type publishers which
// dispatch their contents as soon as they're subscribed to
XCTAssertEqual(testableSubscriber.recordedOutput, [
(200, .subscription), // we're expecting to subscribe at 200
(200, .input([1, 2, 3])), // then receive an array of results immediately
(200, .completion(.finished)), // the `collect` operator finishes immediately after completion
])
}
}
我认为 Publishers.MergeMany
在这里可以提供帮助。在您的示例中,您可以像这样使用它:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
let publishers = ingredients.map(createIngredient(ingredient:))
return Publishers.MergeMany(publishers).eraseToAnyPublisher()
}
这将为您提供一个发布者,向您发送 Output
的单个值。
但是,如果您特别希望在所有发布者完成后立即将 Output
放入一个数组中,您可以使用 collect()
和 MergeMany
:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
let publishers = ingredients.map(createIngredient(ingredient:))
return Publishers.MergeMany(publishers).collect().eraseToAnyPublisher()
}
如果您愿意,您可以将上述任何一个示例简化为一行,即:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
Publishers.MergeMany(ingredients.map(createIngredient(ingredient:))).eraseToAnyPublisher()
}
您还可以在 Sequence
上定义自己的自定义 merge()
扩展方法,并使用它来稍微简化代码:
extension Sequence where Element: Publisher {
func merge() -> Publishers.MergeMany<Element> {
Publishers.MergeMany(self)
}
}
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
ingredients.map(createIngredient).merge().eraseToAnyPublisher()
}
添加 Tricky 的答案,这是一个保留数组中元素顺序的解决方案。
它通过整个链为每个元素传递一个索引,并根据索引对收集到的数组进行排序。
由于排序,复杂度应该是 O(n log n)。
import Combine
extension Publishers {
private struct EnumeratedElement<T> {
let index: Int
let element: T
init(index: Int, element: T) {
self.index = index
self.element = element
}
init(_ enumeratedSequence: EnumeratedSequence<[T]>.Iterator.Element) {
index = enumeratedSequence.offset
element = enumeratedSequence.element
}
}
static func mergeMappedRetainingOrder<InputType, OutputType>(
_ inputArray: [InputType],
mapTransform: (InputType) -> AnyPublisher<OutputType, Error>
) -> AnyPublisher<[OutputType], Error> {
let enumeratedInputArray = inputArray.enumerated().map(EnumeratedElement.init)
let enumeratedMapTransform: (EnumeratedElement<InputType>) -> AnyPublisher<EnumeratedElement<OutputType>, Error> = { enumeratedInput in
mapTransform(enumeratedInput.element)
.map { EnumeratedElement(index: enumeratedInput.index, element: [=10=])}
.eraseToAnyPublisher()
}
let sortEnumeratedOutputArrayByIndex: ([EnumeratedElement<OutputType>]) -> [EnumeratedElement<OutputType>] = { enumeratedOutputArray in
enumeratedOutputArray.sorted { [=10=].index < .index }
}
let transformToNonEnumeratedArray: ([EnumeratedElement<OutputType>]) -> [OutputType] = {
[=10=].map { [=10=].element }
}
return Publishers.MergeMany(enumeratedInputArray.map(enumeratedMapTransform))
.collect()
.map(sortEnumeratedOutputArrayByIndex)
.map(transformToNonEnumeratedArray)
.eraseToAnyPublisher()
}
}
解决方案的单元测试:
import XCTest
import Combine
final class PublishersExtensionsTests: XCTestCase {
// MARK: - Private properties
private var cancellables = Set<AnyCancellable>()
// MARK: - Tests
func test_mergeMappedRetainingOrder() {
let expectation = expectation(description: "mergeMappedRetainingOrder publisher")
let numbers = (1...100).map { _ in Int.random(in: 1...3) }
let mapTransform: (Int) -> AnyPublisher<Int, Error> = {
let delayTimeInterval = RunLoop.SchedulerTimeType.Stride(Double([=11=]))
return Just([=11=])
.delay(for: delayTimeInterval, scheduler: RunLoop.main)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let resultNumbersPublisher = Publishers.mergeMappedRetainingOrder(numbers, mapTransform: mapTransform)
resultNumbersPublisher.sink(receiveCompletion: { _ in }, receiveValue: { resultNumbers in
XCTAssertTrue(numbers == resultNumbers)
expectation.fulfill()
}).store(in: &cancellables)
waitForExpectations(timeout: 5)
}
}
一行即可完成:
.flatMap(Publishers.Sequence.init(sequence:))
我想使用 Apple 的新 Combine 框架从列表中的每个元素发出多个请求。然后我想要一个减少所有响应的单一结果。基本上我想从发布者列表转到拥有响应列表的单个发布者。
我试过制作一个发布商列表,但我不知道如何将该列表缩减为一个发布商。我试过制作一个包含列表的发布者,但我无法平面映射发布者列表。
请看"createIngredients"函数
func createIngredient(ingredient: Ingredient) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
return apollo.performPub(mutation: CreateIngredientMutation(name: ingredient.name, optionalProduct: ingredient.productId, quantity: ingredient.quantity, unit: ingredient.unit))
.eraseToAnyPublisher()
}
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
// first attempt
let results = ingredients
.map(createIngredient)
// results = [AnyPublisher<CreateIngredientMutation.Data, Error>]
// second attempt
return Publishers.Just(ingredients)
.eraseToAnyPublisher()
.flatMap { (list: [Ingredient]) -> Publisher<[CreateIngredientMutation.Data], Error> in
return list.map(createIngredient) // [AnyPublisher<CreateIngredientMutation.Data, Error>]
}
}
我不确定如何获取发布者数组并将其转换为包含数组的发布者。
类型“[AnyPublisher]”的结果值不符合闭包结果类型'Publisher'
本质上,在您的特定情况下,您会看到类似这样的内容:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
.collect()
.eraseToAnyPublisher()
}
这 'collects' 由上游发布者生成的所有元素,并且 - 一旦它们全部完成 - 生成一个包含所有结果的数组并最终完成自身。
请记住,如果其中一个上游发布者失败——或产生多个结果——元素的数量可能与订阅者的数量不匹配,因此您可能需要额外的运营商来缓解这种情况,具体取决于您的情况。
更通用的答案,您可以使用 EntwineTest framework:
对其进行测试import XCTest
import Combine
import EntwineTest
final class MyTests: XCTestCase {
func testCreateArrayFromArrayOfPublishers() {
typealias SimplePublisher = Just<Int>
// we'll create our 'list of publishers' here. Each publisher emits a single
// Int and then completes successfully – using the `Just` publisher.
let publishers: [SimplePublisher] = [
SimplePublisher(1),
SimplePublisher(2),
SimplePublisher(3),
]
// we'll turn our array of publishers into a single merged publisher
let publisherOfPublishers = Publishers.MergeMany(publishers)
// Then we `collect` all the individual publisher elements results into
// a single array
let finalPublisher = publisherOfPublishers.collect()
// Let's test what we expect to happen, will happen.
// We'll create a scheduler to run our test on
let testScheduler = TestScheduler()
// Then we'll start a test. Our test will subscribe to our publisher
// at a virtual time of 200, and cancel the subscription at 900
let testableSubscriber = testScheduler.start { finalPublisher }
// we're expecting that, immediately upon subscription, our results will
// arrive. This is because we're using `just` type publishers which
// dispatch their contents as soon as they're subscribed to
XCTAssertEqual(testableSubscriber.recordedOutput, [
(200, .subscription), // we're expecting to subscribe at 200
(200, .input([1, 2, 3])), // then receive an array of results immediately
(200, .completion(.finished)), // the `collect` operator finishes immediately after completion
])
}
}
我认为 Publishers.MergeMany
在这里可以提供帮助。在您的示例中,您可以像这样使用它:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
let publishers = ingredients.map(createIngredient(ingredient:))
return Publishers.MergeMany(publishers).eraseToAnyPublisher()
}
这将为您提供一个发布者,向您发送 Output
的单个值。
但是,如果您特别希望在所有发布者完成后立即将 Output
放入一个数组中,您可以使用 collect()
和 MergeMany
:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
let publishers = ingredients.map(createIngredient(ingredient:))
return Publishers.MergeMany(publishers).collect().eraseToAnyPublisher()
}
如果您愿意,您可以将上述任何一个示例简化为一行,即:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
Publishers.MergeMany(ingredients.map(createIngredient(ingredient:))).eraseToAnyPublisher()
}
您还可以在 Sequence
上定义自己的自定义 merge()
扩展方法,并使用它来稍微简化代码:
extension Sequence where Element: Publisher {
func merge() -> Publishers.MergeMany<Element> {
Publishers.MergeMany(self)
}
}
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
ingredients.map(createIngredient).merge().eraseToAnyPublisher()
}
添加 Tricky 的答案,这是一个保留数组中元素顺序的解决方案。 它通过整个链为每个元素传递一个索引,并根据索引对收集到的数组进行排序。
由于排序,复杂度应该是 O(n log n)。
import Combine
extension Publishers {
private struct EnumeratedElement<T> {
let index: Int
let element: T
init(index: Int, element: T) {
self.index = index
self.element = element
}
init(_ enumeratedSequence: EnumeratedSequence<[T]>.Iterator.Element) {
index = enumeratedSequence.offset
element = enumeratedSequence.element
}
}
static func mergeMappedRetainingOrder<InputType, OutputType>(
_ inputArray: [InputType],
mapTransform: (InputType) -> AnyPublisher<OutputType, Error>
) -> AnyPublisher<[OutputType], Error> {
let enumeratedInputArray = inputArray.enumerated().map(EnumeratedElement.init)
let enumeratedMapTransform: (EnumeratedElement<InputType>) -> AnyPublisher<EnumeratedElement<OutputType>, Error> = { enumeratedInput in
mapTransform(enumeratedInput.element)
.map { EnumeratedElement(index: enumeratedInput.index, element: [=10=])}
.eraseToAnyPublisher()
}
let sortEnumeratedOutputArrayByIndex: ([EnumeratedElement<OutputType>]) -> [EnumeratedElement<OutputType>] = { enumeratedOutputArray in
enumeratedOutputArray.sorted { [=10=].index < .index }
}
let transformToNonEnumeratedArray: ([EnumeratedElement<OutputType>]) -> [OutputType] = {
[=10=].map { [=10=].element }
}
return Publishers.MergeMany(enumeratedInputArray.map(enumeratedMapTransform))
.collect()
.map(sortEnumeratedOutputArrayByIndex)
.map(transformToNonEnumeratedArray)
.eraseToAnyPublisher()
}
}
解决方案的单元测试:
import XCTest
import Combine
final class PublishersExtensionsTests: XCTestCase {
// MARK: - Private properties
private var cancellables = Set<AnyCancellable>()
// MARK: - Tests
func test_mergeMappedRetainingOrder() {
let expectation = expectation(description: "mergeMappedRetainingOrder publisher")
let numbers = (1...100).map { _ in Int.random(in: 1...3) }
let mapTransform: (Int) -> AnyPublisher<Int, Error> = {
let delayTimeInterval = RunLoop.SchedulerTimeType.Stride(Double([=11=]))
return Just([=11=])
.delay(for: delayTimeInterval, scheduler: RunLoop.main)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let resultNumbersPublisher = Publishers.mergeMappedRetainingOrder(numbers, mapTransform: mapTransform)
resultNumbersPublisher.sink(receiveCompletion: { _ in }, receiveValue: { resultNumbers in
XCTAssertTrue(numbers == resultNumbers)
expectation.fulfill()
}).store(in: &cancellables)
waitForExpectations(timeout: 5)
}
}
一行即可完成:
.flatMap(Publishers.Sequence.init(sequence:))