Swift 组合:`prefix(untilCompletionFrom)`?

Swift Combine: `prefix(untilCompletionFrom)`?

RxJava 有一个 takeUntil 运算符,文档将其描述为:

discard any items emitted by an Observable after a second Observable emits an item or terminates

最后一部分是我想用Combine实现的。但我还没有找到任何等效的运算符。我找到的唯一类似运算符是 prefix: untilOutputFrom,文档:

Republishes elements until another publisher emits an element.

所以给出:

fooPublisher.prefix(untilOutputFrom: barPublisher)

不像我想要的那样行事,因为它只在 barPublisher 发出一个元素时才结束。但我希望一些操作员在 barPublisher 完成时完成。

我是不是漏掉了什么?我想要的运营商是否真的以其他名称存在?

我最终自己实现了这个运算符。事实上,我创建了五个运算符,它们都基于相同的共享 (internal) 函数。我为它们添加了一些单元测试,它们似乎工作正常。

如果您发现任何bugs/room需要改进或更好的解决方案,请告诉我

用法

// finish when `barPublisher` completes with `.finish`
fooPublisher.prefix(untilFinishFrom: barPublisher)

// finish when `barPublisher` completes with `.output` OR `.finish`
fooPublisher.prefix(untilOutputOrFinishFrom: barPublisher)

// finish when `barPublisher` completes either with `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)

// finish when `barPublisher` completes either with `.output` OR `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)

// finish when `barPublisher` completes with `.failure` 
// (I'm not so sure how useful this is... might be better to handle with an of
// the operators working with errors)
fooPublisher.prefix(untilFailureFrom: barPublisher)

解决方案

实施

internal extension Publisher {
    func prefix<CompletionTrigger>(
        untilEventFrom completionTriggeringPublisher: CompletionTrigger,
        completionTriggerOptions: Publishers.CompletionTriggerOptions
    ) -> AnyPublisher<Output, Failure> where CompletionTrigger: Publisher {

        guard completionTriggerOptions != .output else {
            // Fallback to Combine's bundled operator
            return self.prefix(untilOutputFrom: completionTriggeringPublisher).eraseToAnyPublisher()
        }

        let completionAsOutputSubject = PassthroughSubject<Void, Never>()

        var cancellable: Cancellable? = completionTriggeringPublisher
            .sink(
                receiveCompletion: { completion in
                    switch completion {
                    case .failure:
                        guard completionTriggerOptions.contains(.failure) else { return }
                        completionAsOutputSubject.send()
                    case .finished:
                        guard completionTriggerOptions.contains(.finish) else { return }
                        completionAsOutputSubject.send()
                    }
                },
                receiveValue: { _ in
                    guard completionTriggerOptions.contains(.output) else { return }
                    completionAsOutputSubject.send()
            }
        )

        func cleanUp() {
            cancellable = nil
        }

        return self.prefix(untilOutputFrom: completionAsOutputSubject)
            .handleEvents(
                receiveCompletion: { _ in cleanUp() },
                receiveCancel: {
                    cancellable?.cancel()
                    cleanUp()
            }
        )
            .eraseToAnyPublisher()

    }
}

助手

// MARK: Publishers + CompletionTriggerOptions
public extension Publishers {
    struct CompletionTriggerOptions: OptionSet {
        public let rawValue: Int
        public init(rawValue: Int) {
            self.rawValue = rawValue
        }
    }
}

public extension Publishers.CompletionTriggerOptions {
    static let output   = Self(rawValue: 1 << 0)
    static let finish   = Self(rawValue: 1 << 1)
    static let failure  = Self(rawValue: 1 << 2)

    static let completion: Self =  [.finish, .failure]
    static let all: Self =  [.output, .finish, .failure]
}

运算符

public extension Publisher {

    func prefix<CompletionTrigger>(
        untilCompletionFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .completion)
    }

    func prefix<CompletionTrigger>(
        untilFinishFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .finish)
    }

    func prefix<CompletionTrigger>(
        untilFailureFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .failure)
    }

    func prefix<CompletionTrigger>(
        untilOutputOrFinishFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .finish])
    }

    ///
    func prefix<CompletionTrigger>(
        untilOutputOrCompletionFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .completion])
    }
}

单元测试


import Foundation
import XCTest
import Combine

final class PrefixUntilCompletionFromTests: TestCase {

    // MARK: Combine's bundled
    func test_that_publisher___prefix_untilOutputFrom___completes_when_received_output() {

        let finishTriggeringSubject = PassthroughSubject<Void, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send()
        }
        ) {
            return [=14=].merge(with: ).prefix(untilOutputFrom: finishTriggeringSubject).eraseToAnyPublisher()
        }

    }

    // MARK: Custom `prefix(until*`

    // MARK: `prefix:untilCompletionFrom`
    func test_that_publisher___prefix_untilCompletionFrom___completes_when_received_finish() {

        let finishTriggeringSubject = PassthroughSubject<Int, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .finished)
        }
        ) {
            [=14=].merge(with: ).prefix(untilCompletionFrom: finishTriggeringSubject)
        }
    }

    // MARK: `prefix:untilOutputOrFinishFrom`
    func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_finish() {

        let finishTriggeringSubject = PassthroughSubject<Int, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .finished)
        }
        ) {
            [=14=].merge(with: ).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
        }
    }


    func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_output() {

        let finishTriggeringSubject = PassthroughSubject<Void, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send()
        }
        ) {
            [=14=].merge(with: ).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
        }
    }

    // MARK: `prefix:untilOutputOrCompletionFrom`
    func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_finish() {

        let finishTriggeringSubject = PassthroughSubject<Int, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .finished)
        }
        ) {
            [=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
        }
    }


    func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_output() {

        let finishTriggeringSubject = PassthroughSubject<Void, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send()
        }
        ) {
            [=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
        }
    }

    func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_failure() {
        struct ErrorMarker: Swift.Error {}
        let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
        }
        ) {
            [=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
        }
    }

    // MARK: `prefix:untilFailureFrom`
    func test_that_publisher___prefix_untilFailureFrom___completes_when_received_output() {
        struct ErrorMarker: Swift.Error {}
        let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
        }
        ) {
            [=14=].merge(with: ).prefix(untilFailureFrom: finishTriggeringSubject)
        }
    }

    // MARK: `prefix:untilEventFrom`
    func test_that_publisher___prefix_untilEventFrom___outut_completes_when_received_output() {

        let finishTriggeringSubject = PassthroughSubject<Void, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send()
        }
        ) {
            [=14=].merge(with: ).prefix(untilEventFrom: finishTriggeringSubject, completionTriggerOptions: [.output])
        }
    }

    func doTestPublisherCompletes(
        _ line: UInt = #line,

        triggerFinish: () -> Void,

        makePublisherToTest: (
        _ first: AnyPublisher<Int, Never>,
        _ second: AnyPublisher<Int, Never>
        ) -> AnyPublisher<Int, Never>
    ) {

        let first = PassthroughSubject<Int, Never>()
        let second = PassthroughSubject<Int, Never>()

        let publisherToTest = makePublisherToTest(
            first.eraseToAnyPublisher(),
            second.eraseToAnyPublisher()
        )

        var returnValues = [Int]()
        let expectation = XCTestExpectation(description: self.debugDescription)

        let cancellable = publisherToTest
            .sink(
                receiveCompletion: { _ in expectation.fulfill() },
                receiveValue: { returnValues.append([=14=]) }
        )

        first.send(1)
        first.send(2)
        first.send(completion: .finished)
        first.send(3)
        second.send(4)
        triggerFinish()
        second.send(5)

        wait(for: [expectation], timeout: 0.1)

        // output `3` sent by subject `first` is ignored, since it's sent after it has completed.
        // output `5` sent by subject `second` is ignored since it's sent after our `publisherToTest` has completed
        XCTAssertEqual(returnValues, [1, 2, 4], line: line)

        XCTAssertNotNil(cancellable, line: line)
    }


}