Swift 合并:将一个发布者的更新排在另一个发布者之后
Swift Combine: enqueue updates to one publisher behind another publisher
我遇到这样一种情况,我的代码需要进行一次网络调用以获取一堆项目,但在等待这些项目下来时,另一个网络调用可能会获取对这些项目的更新。我希望能够将这些次要结果排入队列,直到第一个结果完成。有没有办法通过 Combine 来实现?
重要的是,我等不及要发出第二个请求了。它实际上是与第一个请求同时建立的 websocket 的连接,并且更新来自我无法控制的 websocket。
更新
在仔细研究了 Matt 关于 Combine 的 book 之后,我选择了 .prepend()
。但正如 Matt 在评论中警告我的那样,.prepend()
甚至在第一个发布者完成后才订阅另一个发布者。这意味着我错过了之前发送的任何信号。我需要的是一个 Subject
将值排入队列,但也许这并不难做到。无论如何,这是我得到的:
最初我打算使用 .append()
,但我意识到使用 .prepend()
我可以避免保留对其中一个发布者的引用。所以这是我所拥有内容的简化版本。这可能有语法错误,因为我已经从我的(雇主的)代码中删减了它。
有 ItemFeed
,它处理获取项目列表并同时处理项目更新事件。后者可以在初始项目列表之前到达,因此必须通过 Combine 排序才能在它之后到达。我尝试通过将初始项目源添加到更新 PassthroughSubject
.
来做到这一点
下面是一个 XCTestCase
,它模拟长时间的初始项目加载,并在加载完成之前添加更新。它尝试订阅项目列表的更改,并尝试测试第一次更新是最初的 63 个项目,随后的更新是针对 64 个项目(在这种情况下,“更新”导致添加一个项目)。
不幸的是,虽然发布了初始列表,但更新从未到来。我也尝试删除 .output(at:)
运算符,但两个接收器只调用一次。
测试用例设置延迟“获取”并订阅 feed.items
中的更改后,它调用 feed.handleItemUpatedEvent
。这调用 ItemFeed.updateItems.send(_:)
,但不幸的是,它已被遗忘。
class
ItemFeed
{
typealias InitialItemsSource = Deferred<Future<[[String : Any]], Error>>
let updateItems = PassthroughSubject<[Item], Error>()
var funnel : AnyCancellable?
@Published var items = [Item]()
init(initialItemSource inSource: InitialItemsSource)
{
// Passthrough subject each time items are updated…
var pub = self.updateItems.eraseToAnyPublisher()
// Prepend the initial items we need to fetch…
let initialItems = source.tryMap { try [=11=].map { try Item(object: [=11=]) } }
pub = pub.prepend(initialItems).eraseToAnyPublisher()
// Sink on the funnel to add or update to self.items…
self.funnel =
pub.sink { inCompletion in
// Handle errors
}
receiveValue: {
self.update(items: inItems)
}
}
func handleItemUpdatedEvent(_ inItem: Item) {
self.updateItems.send([inItem])
}
func update(items inItems: [Item]) {
// Update or add inItems to self.items
}
}
class
ItemFeedTests : XCTestCase
{
func
testShouldUpdateItems()
throws
{
// Set up a mock source of items…
let source = fetchItems(named: "items", delay: 3.0) // 63 items
let expectation = XCTestExpectation(description: "testShouldUpdateItems")
expectation.expectedFulfillmentCount = 2
let feed = ItemFeed(initialItemSource: source)
let sub1 = feed.$items
.output(at: 0)
.receive(on: DispatchQueue.main)
.sink { inItems in
expectation.fulfill()
debugLog("Got first items: \(inItems.count)")
XCTAssertEqual(inItems.count, 63)
}
let sub2 = feed.$items
.output(at: 1)
.receive(on: DispatchQueue.main)
.sink { inItems in
expectation.fulfill()
debugLog("Got second items: \(inItems.count)")
XCTAssertEqual(inItems.count, 64)
}
// Send an update right away…
let item = try loadItem(named: "Item3")
feed.handleItemUpdatedEvent(item)
XCTAssertEqual(feed.items.count, 0) // Should be no items yet
// Wait for stuff to complete…
wait(for: [expectation], timeout: 10.0)
sub1.cancel() // Not necessary, but silence the compiler warning
sub2.cancel()
}
}
经过反复试验,我找到了解决方案。我创建了一个自定义的 Publisher 和 Subscription,它立即订阅它的上游发布者并开始排队元素(达到一些可指定的容量)。然后等待订阅者出现,并为该订阅者提供到目前为止的所有值,然后继续提供值。这是一个 marble 图表:
然后我将它与 .prepend()
结合使用,如下所示:
extension
Publisher
{
func
enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
-> AnyPublisher<Self.Output, Self.Failure>
where
P : Publisher,
P.Output == Output,
P.Failure == Failure
{
let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
let r = qp.prepend(inGate).eraseToAnyPublisher()
return r
}
}
这就是您使用它的方式……
func
testShouldReturnAllItemsInOrder()
{
let gate = PassthroughSubject<Int, Never>()
let stream = PassthroughSubject<Int, Never>()
var results = [Int]()
let sub = stream.enqueue(gatedBy: gate)
.sink
{ inElement in
debugLog("element: \(inElement)")
results.append(inElement)
}
stream.send(3)
stream.send(4)
stream.send(5)
XCTAssertEqual(results.count, 0)
gate.send(1)
gate.send(2)
gate.send(completion: .finished)
XCTAssertEqual(results.count, 5)
XCTAssertEqual(results, [1,2,3,4,5])
sub.cancel()
}
这会打印出您所期望的内容:
element: 1
element: 2
element: 3
element: 4
element: 5
它运行良好,因为创建 .enqueue(gatedBy:)
运算符会创建排队发布者 qp
,它会立即订阅 stream
并将其发送的任何值排入队列。然后它在 qp
上调用 .prepend()
,它首先订阅 gate
,并等待它完成。完成后,它会订阅 qp
,后者会立即为其提供所有排队的值,然后继续为其提供来自上游发布者的值。
这是我最终得到的 code。
//
// QueuingPublisher.swift
// Latency: Zero, LLC
//
// Created by Rick Mann on 2021-06-03.
//
import Combine
import Foundation
extension
Publishers
{
final
class
Queueing<Upstream: Publisher>: Publisher
{
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let upstream : Upstream
private let capacity : Int
private var queue : [Output] = [Output]()
private var subscription : QueueingSubscription<Queueing<Upstream>, Upstream>?
fileprivate var completion : Subscribers.Completion<Failure>? = nil
init(upstream inUpstream: Upstream, capacity inCapacity: Int)
{
self.upstream = inUpstream
self.capacity = inCapacity
// Subscribe to the upstream right away so we can start
// enqueueing values…
let sink = AnySubscriber { [=13=].request(.unlimited) }
receiveValue:
{ [weak self] (inValue: Output) -> Subscribers.Demand in
self?.relay(inValue)
return .none
}
receiveCompletion:
{ [weak self] (inCompletion: Subscribers.Completion<Failure>) in
self?.completion = inCompletion
self?.subscription?.complete(with: inCompletion)
}
inUpstream.subscribe(sink)
}
func
receive<S: Subscriber>(subscriber inSubscriber: S)
where
Failure == S.Failure,
Output == S.Input
{
let subscription = QueueingSubscription(publisher: self, subscriber: inSubscriber)
self.subscription = subscription
inSubscriber.receive(subscription: subscription)
}
/**
Return up to inDemand values.
*/
func
request(_ inDemand: Subscribers.Demand)
-> [Output]
{
let count = inDemand.min(self.queue.count)
let elements = Array(self.queue[..<count])
self.queue.removeFirst(count)
return elements
}
private
func
relay(_ inValue: Output)
{
// TODO: The Wenderlich example code checks to see if the upstream has completed,
// but I feel like want to send all the values we've gotten first?
// Save the new value…
self.queue.append(inValue)
// Discard the oldest if we’re over capacity…
if self.queue.count > self.capacity
{
self.queue.removeFirst()
}
// Send the buffer to our subscriber…
self.subscription?.dataAvailable()
}
final
class
QueueingSubscription<QP, Upstream> : Subscription
where
QP : Queueing<Upstream>
{
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
let publisher : QP
var subscriber : AnySubscriber<Output,Failure>? = nil
private var demand : Subscribers.Demand = .none
init<S>(publisher inP: QP,
subscriber inS: S)
where
S: Subscriber,
Failure == S.Failure,
Output == S.Input
{
self.publisher = inP
self.subscriber = AnySubscriber(inS)
}
func
request(_ inDemand: Subscribers.Demand)
{
self.demand += inDemand
emitAsNeeded()
}
func
cancel()
{
complete(with: .finished)
}
/**
Called by our publisher to let us know new
data has arrived.
*/
func
dataAvailable()
{
emitAsNeeded()
}
private
func
emitAsNeeded()
{
guard let subscriber = self.subscriber else { return }
let newValues = self.publisher.request(self.demand)
self.demand -= newValues.count
newValues.forEach
{
let nextDemand = subscriber.receive([=13=])
self.demand += nextDemand
}
if let completion = self.publisher.completion
{
complete(with: completion)
}
}
fileprivate
func
complete(with inCompletion: Subscribers.Completion<Failure>)
{
guard let subscriber = self.subscriber else { return }
self.subscriber = nil
subscriber.receive(completion: inCompletion)
}
}
}
} // extension Publishers
extension
Publisher
{
func
enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
-> AnyPublisher<Self.Output, Self.Failure>
where
P : Publisher,
P.Output == Output,
P.Failure == Failure
{
let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
let r = qp.prepend(inGate).eraseToAnyPublisher()
return r
}
}
extension
Subscribers.Demand
{
func
min(_ inValue: Int)
-> Int
{
if self == .unlimited
{
return inValue
}
return Swift.min(self.max!, inValue)
}
}
接受的答案实际上没有任何代码,但我能够找到解决方案。我最终创建了一个订阅“门发布者”的自定义发布者,并创建了一个为上游发布者创建接收器的订阅。我缓冲来自上游的值并根据需求发出门发布者值直到它完成,然后我切换到根据需求向下游发送缓冲区。棘手的部分是跟踪上游/网关发布者并将需求发送到正确的发布者。
我遇到这样一种情况,我的代码需要进行一次网络调用以获取一堆项目,但在等待这些项目下来时,另一个网络调用可能会获取对这些项目的更新。我希望能够将这些次要结果排入队列,直到第一个结果完成。有没有办法通过 Combine 来实现?
重要的是,我等不及要发出第二个请求了。它实际上是与第一个请求同时建立的 websocket 的连接,并且更新来自我无法控制的 websocket。
更新
在仔细研究了 Matt 关于 Combine 的 book 之后,我选择了 .prepend()
。但正如 Matt 在评论中警告我的那样,.prepend()
甚至在第一个发布者完成后才订阅另一个发布者。这意味着我错过了之前发送的任何信号。我需要的是一个 Subject
将值排入队列,但也许这并不难做到。无论如何,这是我得到的:
最初我打算使用 .append()
,但我意识到使用 .prepend()
我可以避免保留对其中一个发布者的引用。所以这是我所拥有内容的简化版本。这可能有语法错误,因为我已经从我的(雇主的)代码中删减了它。
有 ItemFeed
,它处理获取项目列表并同时处理项目更新事件。后者可以在初始项目列表之前到达,因此必须通过 Combine 排序才能在它之后到达。我尝试通过将初始项目源添加到更新 PassthroughSubject
.
下面是一个 XCTestCase
,它模拟长时间的初始项目加载,并在加载完成之前添加更新。它尝试订阅项目列表的更改,并尝试测试第一次更新是最初的 63 个项目,随后的更新是针对 64 个项目(在这种情况下,“更新”导致添加一个项目)。
不幸的是,虽然发布了初始列表,但更新从未到来。我也尝试删除 .output(at:)
运算符,但两个接收器只调用一次。
测试用例设置延迟“获取”并订阅 feed.items
中的更改后,它调用 feed.handleItemUpatedEvent
。这调用 ItemFeed.updateItems.send(_:)
,但不幸的是,它已被遗忘。
class
ItemFeed
{
typealias InitialItemsSource = Deferred<Future<[[String : Any]], Error>>
let updateItems = PassthroughSubject<[Item], Error>()
var funnel : AnyCancellable?
@Published var items = [Item]()
init(initialItemSource inSource: InitialItemsSource)
{
// Passthrough subject each time items are updated…
var pub = self.updateItems.eraseToAnyPublisher()
// Prepend the initial items we need to fetch…
let initialItems = source.tryMap { try [=11=].map { try Item(object: [=11=]) } }
pub = pub.prepend(initialItems).eraseToAnyPublisher()
// Sink on the funnel to add or update to self.items…
self.funnel =
pub.sink { inCompletion in
// Handle errors
}
receiveValue: {
self.update(items: inItems)
}
}
func handleItemUpdatedEvent(_ inItem: Item) {
self.updateItems.send([inItem])
}
func update(items inItems: [Item]) {
// Update or add inItems to self.items
}
}
class
ItemFeedTests : XCTestCase
{
func
testShouldUpdateItems()
throws
{
// Set up a mock source of items…
let source = fetchItems(named: "items", delay: 3.0) // 63 items
let expectation = XCTestExpectation(description: "testShouldUpdateItems")
expectation.expectedFulfillmentCount = 2
let feed = ItemFeed(initialItemSource: source)
let sub1 = feed.$items
.output(at: 0)
.receive(on: DispatchQueue.main)
.sink { inItems in
expectation.fulfill()
debugLog("Got first items: \(inItems.count)")
XCTAssertEqual(inItems.count, 63)
}
let sub2 = feed.$items
.output(at: 1)
.receive(on: DispatchQueue.main)
.sink { inItems in
expectation.fulfill()
debugLog("Got second items: \(inItems.count)")
XCTAssertEqual(inItems.count, 64)
}
// Send an update right away…
let item = try loadItem(named: "Item3")
feed.handleItemUpdatedEvent(item)
XCTAssertEqual(feed.items.count, 0) // Should be no items yet
// Wait for stuff to complete…
wait(for: [expectation], timeout: 10.0)
sub1.cancel() // Not necessary, but silence the compiler warning
sub2.cancel()
}
}
经过反复试验,我找到了解决方案。我创建了一个自定义的 Publisher 和 Subscription,它立即订阅它的上游发布者并开始排队元素(达到一些可指定的容量)。然后等待订阅者出现,并为该订阅者提供到目前为止的所有值,然后继续提供值。这是一个 marble 图表:
然后我将它与 .prepend()
结合使用,如下所示:
extension
Publisher
{
func
enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
-> AnyPublisher<Self.Output, Self.Failure>
where
P : Publisher,
P.Output == Output,
P.Failure == Failure
{
let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
let r = qp.prepend(inGate).eraseToAnyPublisher()
return r
}
}
这就是您使用它的方式……
func
testShouldReturnAllItemsInOrder()
{
let gate = PassthroughSubject<Int, Never>()
let stream = PassthroughSubject<Int, Never>()
var results = [Int]()
let sub = stream.enqueue(gatedBy: gate)
.sink
{ inElement in
debugLog("element: \(inElement)")
results.append(inElement)
}
stream.send(3)
stream.send(4)
stream.send(5)
XCTAssertEqual(results.count, 0)
gate.send(1)
gate.send(2)
gate.send(completion: .finished)
XCTAssertEqual(results.count, 5)
XCTAssertEqual(results, [1,2,3,4,5])
sub.cancel()
}
这会打印出您所期望的内容:
element: 1
element: 2
element: 3
element: 4
element: 5
它运行良好,因为创建 .enqueue(gatedBy:)
运算符会创建排队发布者 qp
,它会立即订阅 stream
并将其发送的任何值排入队列。然后它在 qp
上调用 .prepend()
,它首先订阅 gate
,并等待它完成。完成后,它会订阅 qp
,后者会立即为其提供所有排队的值,然后继续为其提供来自上游发布者的值。
这是我最终得到的 code。
//
// QueuingPublisher.swift
// Latency: Zero, LLC
//
// Created by Rick Mann on 2021-06-03.
//
import Combine
import Foundation
extension
Publishers
{
final
class
Queueing<Upstream: Publisher>: Publisher
{
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let upstream : Upstream
private let capacity : Int
private var queue : [Output] = [Output]()
private var subscription : QueueingSubscription<Queueing<Upstream>, Upstream>?
fileprivate var completion : Subscribers.Completion<Failure>? = nil
init(upstream inUpstream: Upstream, capacity inCapacity: Int)
{
self.upstream = inUpstream
self.capacity = inCapacity
// Subscribe to the upstream right away so we can start
// enqueueing values…
let sink = AnySubscriber { [=13=].request(.unlimited) }
receiveValue:
{ [weak self] (inValue: Output) -> Subscribers.Demand in
self?.relay(inValue)
return .none
}
receiveCompletion:
{ [weak self] (inCompletion: Subscribers.Completion<Failure>) in
self?.completion = inCompletion
self?.subscription?.complete(with: inCompletion)
}
inUpstream.subscribe(sink)
}
func
receive<S: Subscriber>(subscriber inSubscriber: S)
where
Failure == S.Failure,
Output == S.Input
{
let subscription = QueueingSubscription(publisher: self, subscriber: inSubscriber)
self.subscription = subscription
inSubscriber.receive(subscription: subscription)
}
/**
Return up to inDemand values.
*/
func
request(_ inDemand: Subscribers.Demand)
-> [Output]
{
let count = inDemand.min(self.queue.count)
let elements = Array(self.queue[..<count])
self.queue.removeFirst(count)
return elements
}
private
func
relay(_ inValue: Output)
{
// TODO: The Wenderlich example code checks to see if the upstream has completed,
// but I feel like want to send all the values we've gotten first?
// Save the new value…
self.queue.append(inValue)
// Discard the oldest if we’re over capacity…
if self.queue.count > self.capacity
{
self.queue.removeFirst()
}
// Send the buffer to our subscriber…
self.subscription?.dataAvailable()
}
final
class
QueueingSubscription<QP, Upstream> : Subscription
where
QP : Queueing<Upstream>
{
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
let publisher : QP
var subscriber : AnySubscriber<Output,Failure>? = nil
private var demand : Subscribers.Demand = .none
init<S>(publisher inP: QP,
subscriber inS: S)
where
S: Subscriber,
Failure == S.Failure,
Output == S.Input
{
self.publisher = inP
self.subscriber = AnySubscriber(inS)
}
func
request(_ inDemand: Subscribers.Demand)
{
self.demand += inDemand
emitAsNeeded()
}
func
cancel()
{
complete(with: .finished)
}
/**
Called by our publisher to let us know new
data has arrived.
*/
func
dataAvailable()
{
emitAsNeeded()
}
private
func
emitAsNeeded()
{
guard let subscriber = self.subscriber else { return }
let newValues = self.publisher.request(self.demand)
self.demand -= newValues.count
newValues.forEach
{
let nextDemand = subscriber.receive([=13=])
self.demand += nextDemand
}
if let completion = self.publisher.completion
{
complete(with: completion)
}
}
fileprivate
func
complete(with inCompletion: Subscribers.Completion<Failure>)
{
guard let subscriber = self.subscriber else { return }
self.subscriber = nil
subscriber.receive(completion: inCompletion)
}
}
}
} // extension Publishers
extension
Publisher
{
func
enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
-> AnyPublisher<Self.Output, Self.Failure>
where
P : Publisher,
P.Output == Output,
P.Failure == Failure
{
let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
let r = qp.prepend(inGate).eraseToAnyPublisher()
return r
}
}
extension
Subscribers.Demand
{
func
min(_ inValue: Int)
-> Int
{
if self == .unlimited
{
return inValue
}
return Swift.min(self.max!, inValue)
}
}
接受的答案实际上没有任何代码,但我能够找到解决方案。我最终创建了一个订阅“门发布者”的自定义发布者,并创建了一个为上游发布者创建接收器的订阅。我缓冲来自上游的值并根据需求发出门发布者值直到它完成,然后我切换到根据需求向下游发送缓冲区。棘手的部分是跟踪上游/网关发布者并将需求发送到正确的发布者。