Swift 合并:将一个发布者的更新排在另一个发布者之后

Swift Combine: enqueue updates to one publisher behind another publisher

我遇到这样一种情况,我的代码需要进行一次网络调用以获取一堆项目,但在等待这些项目下来时,另一个网络调用可能会获取对这些项目的更新。我希望能够将这些次要结果排入队列,直到第一个结果完成。有没有办法通过 Combine 来实现?

重要的是,我等不及要发出第二个请求了。它实际上是与第一个请求同时建立的 websocket 的连接,并且更新来自我无法控制的 websocket。

更新

在仔细研究了 Matt 关于 Combine 的 book 之后,我选择了 .prepend()。但正如 Matt 在评论中警告我的那样,.prepend() 甚至在第一个发布者完成后才订阅另一个发布者。这意味着我错过了之前发送的任何信号。我需要的是一个 Subject 将值排入队列,但也许这并不难做到。无论如何,这是我得到的:

最初我打算使用 .append(),但我意识到使用 .prepend() 我可以避免保留对其中一个发布者的引用。所以这是我所拥有内容的简化版本。这可能有语法错误,因为我已经从我的(雇主的)代码中删减了它。

ItemFeed,它处理获取项目列表并同时处理项目更新事件。后者可以在初始项目列表之前到达,因此必须通过 Combine 排序才能在它之后到达。我尝试通过将初始项目源添加到更新 PassthroughSubject.

来做到这一点

下面是一个 XCTestCase,它模拟长时间的初始项目加载,并在加载完成之前添加更新。它尝试订阅项目列表的更改,并尝试测试第一次更新是最初的 63 个项目,随后的更新是针对 64 个项目(在这种情况下,“更新”导致添加一个项目)。

不幸的是,虽然发布了初始列表,但更新从未到来。我也尝试删除 .output(at:) 运算符,但两个接收器只调用一次。

测试用例设置延迟“获取”并订阅 feed.items 中的更改后,它调用 feed.handleItemUpatedEvent。这调用 ItemFeed.updateItems.send(_:),但不幸的是,它已被遗忘。

class
ItemFeed
{
    typealias   InitialItemsSource      =   Deferred<Future<[[String : Any]], Error>>
    
                let updateItems         =   PassthroughSubject<[Item], Error>()
                var funnel              :   AnyCancellable?
    
    @Published  var items               =   [Item]()
    
    
    
    init(initialItemSource inSource: InitialItemsSource)
    {
        //  Passthrough subject each time items are updated…
        
        var pub = self.updateItems.eraseToAnyPublisher()
        
        //  Prepend the initial items we need to fetch…
        
        let initialItems = source.tryMap { try [=11=].map { try Item(object: [=11=]) } }
        pub = pub.prepend(initialItems).eraseToAnyPublisher()
        
        //  Sink on the funnel to add or update to self.items…
        
        self.funnel =
            pub.sink { inCompletion in
                //  Handle errors
            }
            receiveValue: {
                self.update(items: inItems)
            }
    }
    
    func handleItemUpdatedEvent(_ inItem: Item) {
        self.updateItems.send([inItem])
    }
    
    func update(items inItems: [Item]) {
        //  Update or add inItems to self.items
    }
}

class
ItemFeedTests : XCTestCase
{
    func
    testShouldUpdateItems()
        throws
    {
        //  Set up a mock source of items…
        
        let source = fetchItems(named: "items", delay: 3.0)      //  63 items
        
        let expectation = XCTestExpectation(description: "testShouldUpdateItems")
        expectation.expectedFulfillmentCount = 2
        
        let feed = ItemFeed(initialItemSource: source)
        
        let sub1 = feed.$items
                    .output(at: 0)
                    .receive(on: DispatchQueue.main)
                    .sink { inItems in
                        expectation.fulfill()
                        
                        debugLog("Got first items: \(inItems.count)")
                        XCTAssertEqual(inItems.count, 63)
                    }
        
        let sub2 = feed.$items
                    .output(at: 1)
                    .receive(on: DispatchQueue.main)
                    .sink { inItems in
                        expectation.fulfill()

                        debugLog("Got second items: \(inItems.count)")
                        XCTAssertEqual(inItems.count, 64)
                    }
        
        //  Send an update right away…
        
        let item = try loadItem(named: "Item3")
        feed.handleItemUpdatedEvent(item)
        
        XCTAssertEqual(feed.items.count, 0)         //  Should be no items yet
        
        //  Wait for stuff to complete…
        
        wait(for: [expectation], timeout: 10.0)
        
        sub1.cancel()           //  Not necessary, but silence the compiler warning
        sub2.cancel()
    }
}   

经过反复试验,我找到了解决方案。我创建了一个自定义的 Publisher 和 Subscription,它立即订阅它的上游发布者并开始排队元素(达到一些可指定的容量)。然后等待订阅者出现,并为该订阅者提供到目前为止的所有值,然后继续提供值。这是一个 marble 图表:

然后我将它与 .prepend() 结合使用,如下所示:

extension
Publisher
{
    func
    enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
        -> AnyPublisher<Self.Output, Self.Failure>
        where
            P : Publisher,
            P.Output == Output,
            P.Failure == Failure
    {
        let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
        let r = qp.prepend(inGate).eraseToAnyPublisher()
        return r
    }
}

这就是您使用它的方式……

func
testShouldReturnAllItemsInOrder()
{
    let gate = PassthroughSubject<Int, Never>()
    let stream = PassthroughSubject<Int, Never>()
    
    var results = [Int]()
    
    let sub = stream.enqueue(gatedBy: gate)
                .sink
                { inElement in
                    debugLog("element: \(inElement)")
                    results.append(inElement)
                }
    stream.send(3)
    stream.send(4)
    stream.send(5)
    
    XCTAssertEqual(results.count, 0)
    
    gate.send(1)
    gate.send(2)
    gate.send(completion: .finished)
    
    XCTAssertEqual(results.count, 5)
    XCTAssertEqual(results, [1,2,3,4,5])
    
    sub.cancel()
}

这会打印出您所期望的内容:

element: 1
element: 2
element: 3
element: 4
element: 5

它运行良好,因为创建 .enqueue(gatedBy:) 运算符会创建排队发布者 qp,它会立即订阅 stream 并将其发送的任何值排入队列。然后它在 qp 上调用 .prepend(),它首先订阅 gate,并等待它完成。完成后,它会订阅 qp,后者会立即为其提供所有排队的值,然后继续为其提供来自上游发布者的值。

这是我最终得到的 code

//
//  QueuingPublisher.swift
//  Latency: Zero, LLC
//
//  Created by Rick Mann on 2021-06-03.
//

import Combine
import Foundation



extension
Publishers
{
    final
    class
    Queueing<Upstream: Publisher>: Publisher
    {
        typealias Output = Upstream.Output
        typealias Failure = Upstream.Failure
        
        private     let upstream            :   Upstream
        private     let capacity            :   Int
        private     var queue               :   [Output]                                                =   [Output]()
        private     var subscription        :   QueueingSubscription<Queueing<Upstream>, Upstream>?
        fileprivate var completion          :   Subscribers.Completion<Failure>?                        =   nil
        
        init(upstream inUpstream: Upstream, capacity inCapacity: Int)
        {
            self.upstream = inUpstream
            self.capacity = inCapacity
            
            //  Subscribe to the upstream right away so we can start
            //  enqueueing values…
            
            let sink = AnySubscriber { [=13=].request(.unlimited) }
                        receiveValue:
                        { [weak self] (inValue: Output) -> Subscribers.Demand in
                            self?.relay(inValue)
                            return .none
                        }
                        receiveCompletion:
                        { [weak self] (inCompletion: Subscribers.Completion<Failure>) in
                            self?.completion = inCompletion
                            self?.subscription?.complete(with: inCompletion)
                        }
            inUpstream.subscribe(sink)
        }
        
        func
        receive<S: Subscriber>(subscriber inSubscriber: S)
            where
                Failure == S.Failure,
                Output == S.Input
        {
            let subscription = QueueingSubscription(publisher: self, subscriber: inSubscriber)
            self.subscription = subscription
            inSubscriber.receive(subscription: subscription)
        }

        /**
            Return up to inDemand values.
        */
        
        func
        request(_ inDemand: Subscribers.Demand)
            -> [Output]
        {
            let count = inDemand.min(self.queue.count)
            let elements = Array(self.queue[..<count])
            self.queue.removeFirst(count)
            return elements
        }
        
        private
        func
        relay(_ inValue: Output)
        {
            //  TODO: The Wenderlich example code checks to see if the upstream has completed,
            //          but I feel like want to send all the values we've gotten first?
            
            //  Save the new value…
            
            self.queue.append(inValue)
            
            //  Discard the oldest if we’re over capacity…
            
            if self.queue.count > self.capacity
            {
                self.queue.removeFirst()
            }
            
            //  Send the buffer to our subscriber…
            
            self.subscription?.dataAvailable()
        }
        

        final
        class
        QueueingSubscription<QP, Upstream> : Subscription
            where
                QP : Queueing<Upstream>
        {
            typealias Output = Upstream.Output
            typealias Failure = Upstream.Failure
            
                    let publisher           :   QP
                    var subscriber          :   AnySubscriber<Output,Failure>?              =   nil
            private var demand              :   Subscribers.Demand                          =   .none
            
            init<S>(publisher inP: QP,
                    subscriber inS: S)
                where
                    S: Subscriber,
                    Failure == S.Failure,
                    Output == S.Input
            {
                self.publisher = inP
                self.subscriber = AnySubscriber(inS)
            }
            
            func
            request(_ inDemand: Subscribers.Demand)
            {
                self.demand += inDemand
                emitAsNeeded()
            }
            
            func
            cancel()
            {
                complete(with: .finished)
            }
            
            /**
                Called by our publisher to let us know new
                data has arrived.
            */
            
            func
            dataAvailable()
            {
                emitAsNeeded()
            }
            
            private
            func
            emitAsNeeded()
            {
                guard let subscriber = self.subscriber else { return }
                
                let newValues = self.publisher.request(self.demand)
                self.demand -= newValues.count
                newValues.forEach
                {
                    let nextDemand = subscriber.receive([=13=])
                    self.demand += nextDemand
                }
                
                if let completion = self.publisher.completion
                {
                    complete(with: completion)
                }
            }
            
            fileprivate
            func
            complete(with inCompletion: Subscribers.Completion<Failure>)
            {
                guard let subscriber = self.subscriber else { return }
                self.subscriber = nil
                
                subscriber.receive(completion: inCompletion)
            }
        }
    }
}   //  extension Publishers


extension
Publisher
{
    func
    enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
        -> AnyPublisher<Self.Output, Self.Failure>
        where
            P : Publisher,
            P.Output == Output,
            P.Failure == Failure
    {
        let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
        let r = qp.prepend(inGate).eraseToAnyPublisher()
        return r
    }
}

extension
Subscribers.Demand
{
    func
    min(_ inValue: Int)
        -> Int
    {
        if self == .unlimited
        {
            return inValue
        }
        
        return Swift.min(self.max!, inValue)
    }
}

接受的答案实际上没有任何代码,但我能够找到解决方案。我最终创建了一个订阅“门发布者”的自定义发布者,并创建了一个为上游发布者创建接收器的订阅。我缓冲来自上游的值并根据需求发出门发布者值直到它完成,然后我切换到根据需求向下游发送缓冲区。棘手的部分是跟踪上游/网关发布者并将需求发送到正确的发布者。