Swift 组合:`prefix(untilCompletionFrom)`?
Swift Combine: `prefix(untilCompletionFrom)`?
RxJava 有一个 takeUntil
运算符,文档将其描述为:
discard any items emitted by an Observable after a second Observable emits an item or terminates
最后一部分是我想用Combine
实现的。但我还没有找到任何等效的运算符。我找到的唯一类似运算符是 prefix: untilOutputFrom
,文档:
Republishes elements until another publisher emits an element.
所以给出:
fooPublisher.prefix(untilOutputFrom: barPublisher)
不像我想要的那样行事,因为它只在 barPublisher
发出一个元素时才结束。但我希望一些操作员在 barPublisher
完成时完成。
我是不是漏掉了什么?我想要的运营商是否真的以其他名称存在?
我最终自己实现了这个运算符。事实上,我创建了五个运算符,它们都基于相同的共享 (internal
) 函数。我为它们添加了一些单元测试,它们似乎工作正常。
如果您发现任何bugs/room需要改进或更好的解决方案,请告诉我
用法
// finish when `barPublisher` completes with `.finish`
fooPublisher.prefix(untilFinishFrom: barPublisher)
// finish when `barPublisher` completes with `.output` OR `.finish`
fooPublisher.prefix(untilOutputOrFinishFrom: barPublisher)
// finish when `barPublisher` completes either with `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)
// finish when `barPublisher` completes either with `.output` OR `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)
// finish when `barPublisher` completes with `.failure`
// (I'm not so sure how useful this is... might be better to handle with an of
// the operators working with errors)
fooPublisher.prefix(untilFailureFrom: barPublisher)
解决方案
实施
internal extension Publisher {
func prefix<CompletionTrigger>(
untilEventFrom completionTriggeringPublisher: CompletionTrigger,
completionTriggerOptions: Publishers.CompletionTriggerOptions
) -> AnyPublisher<Output, Failure> where CompletionTrigger: Publisher {
guard completionTriggerOptions != .output else {
// Fallback to Combine's bundled operator
return self.prefix(untilOutputFrom: completionTriggeringPublisher).eraseToAnyPublisher()
}
let completionAsOutputSubject = PassthroughSubject<Void, Never>()
var cancellable: Cancellable? = completionTriggeringPublisher
.sink(
receiveCompletion: { completion in
switch completion {
case .failure:
guard completionTriggerOptions.contains(.failure) else { return }
completionAsOutputSubject.send()
case .finished:
guard completionTriggerOptions.contains(.finish) else { return }
completionAsOutputSubject.send()
}
},
receiveValue: { _ in
guard completionTriggerOptions.contains(.output) else { return }
completionAsOutputSubject.send()
}
)
func cleanUp() {
cancellable = nil
}
return self.prefix(untilOutputFrom: completionAsOutputSubject)
.handleEvents(
receiveCompletion: { _ in cleanUp() },
receiveCancel: {
cancellable?.cancel()
cleanUp()
}
)
.eraseToAnyPublisher()
}
}
助手
// MARK: Publishers + CompletionTriggerOptions
public extension Publishers {
struct CompletionTriggerOptions: OptionSet {
public let rawValue: Int
public init(rawValue: Int) {
self.rawValue = rawValue
}
}
}
public extension Publishers.CompletionTriggerOptions {
static let output = Self(rawValue: 1 << 0)
static let finish = Self(rawValue: 1 << 1)
static let failure = Self(rawValue: 1 << 2)
static let completion: Self = [.finish, .failure]
static let all: Self = [.output, .finish, .failure]
}
运算符
public extension Publisher {
func prefix<CompletionTrigger>(
untilCompletionFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .completion)
}
func prefix<CompletionTrigger>(
untilFinishFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .finish)
}
func prefix<CompletionTrigger>(
untilFailureFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .failure)
}
func prefix<CompletionTrigger>(
untilOutputOrFinishFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .finish])
}
///
func prefix<CompletionTrigger>(
untilOutputOrCompletionFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .completion])
}
}
单元测试
import Foundation
import XCTest
import Combine
final class PrefixUntilCompletionFromTests: TestCase {
// MARK: Combine's bundled
func test_that_publisher___prefix_untilOutputFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
return [=14=].merge(with: ).prefix(untilOutputFrom: finishTriggeringSubject).eraseToAnyPublisher()
}
}
// MARK: Custom `prefix(until*`
// MARK: `prefix:untilCompletionFrom`
func test_that_publisher___prefix_untilCompletionFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
[=14=].merge(with: ).prefix(untilCompletionFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilOutputOrFinishFrom`
func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
[=14=].merge(with: ).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
[=14=].merge(with: ).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilOutputOrCompletionFrom`
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
[=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
[=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_failure() {
struct ErrorMarker: Swift.Error {}
let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
}
) {
[=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilFailureFrom`
func test_that_publisher___prefix_untilFailureFrom___completes_when_received_output() {
struct ErrorMarker: Swift.Error {}
let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
}
) {
[=14=].merge(with: ).prefix(untilFailureFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilEventFrom`
func test_that_publisher___prefix_untilEventFrom___outut_completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
[=14=].merge(with: ).prefix(untilEventFrom: finishTriggeringSubject, completionTriggerOptions: [.output])
}
}
func doTestPublisherCompletes(
_ line: UInt = #line,
triggerFinish: () -> Void,
makePublisherToTest: (
_ first: AnyPublisher<Int, Never>,
_ second: AnyPublisher<Int, Never>
) -> AnyPublisher<Int, Never>
) {
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let publisherToTest = makePublisherToTest(
first.eraseToAnyPublisher(),
second.eraseToAnyPublisher()
)
var returnValues = [Int]()
let expectation = XCTestExpectation(description: self.debugDescription)
let cancellable = publisherToTest
.sink(
receiveCompletion: { _ in expectation.fulfill() },
receiveValue: { returnValues.append([=14=]) }
)
first.send(1)
first.send(2)
first.send(completion: .finished)
first.send(3)
second.send(4)
triggerFinish()
second.send(5)
wait(for: [expectation], timeout: 0.1)
// output `3` sent by subject `first` is ignored, since it's sent after it has completed.
// output `5` sent by subject `second` is ignored since it's sent after our `publisherToTest` has completed
XCTAssertEqual(returnValues, [1, 2, 4], line: line)
XCTAssertNotNil(cancellable, line: line)
}
}
RxJava 有一个 takeUntil
运算符,文档将其描述为:
discard any items emitted by an Observable after a second Observable emits an item or terminates
最后一部分是我想用Combine
实现的。但我还没有找到任何等效的运算符。我找到的唯一类似运算符是 prefix: untilOutputFrom
,文档:
Republishes elements until another publisher emits an element.
所以给出:
fooPublisher.prefix(untilOutputFrom: barPublisher)
不像我想要的那样行事,因为它只在 barPublisher
发出一个元素时才结束。但我希望一些操作员在 barPublisher
完成时完成。
我是不是漏掉了什么?我想要的运营商是否真的以其他名称存在?
我最终自己实现了这个运算符。事实上,我创建了五个运算符,它们都基于相同的共享 (internal
) 函数。我为它们添加了一些单元测试,它们似乎工作正常。
如果您发现任何bugs/room需要改进或更好的解决方案,请告诉我
用法
// finish when `barPublisher` completes with `.finish`
fooPublisher.prefix(untilFinishFrom: barPublisher)
// finish when `barPublisher` completes with `.output` OR `.finish`
fooPublisher.prefix(untilOutputOrFinishFrom: barPublisher)
// finish when `barPublisher` completes either with `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)
// finish when `barPublisher` completes either with `.output` OR `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)
// finish when `barPublisher` completes with `.failure`
// (I'm not so sure how useful this is... might be better to handle with an of
// the operators working with errors)
fooPublisher.prefix(untilFailureFrom: barPublisher)
解决方案
实施
internal extension Publisher {
func prefix<CompletionTrigger>(
untilEventFrom completionTriggeringPublisher: CompletionTrigger,
completionTriggerOptions: Publishers.CompletionTriggerOptions
) -> AnyPublisher<Output, Failure> where CompletionTrigger: Publisher {
guard completionTriggerOptions != .output else {
// Fallback to Combine's bundled operator
return self.prefix(untilOutputFrom: completionTriggeringPublisher).eraseToAnyPublisher()
}
let completionAsOutputSubject = PassthroughSubject<Void, Never>()
var cancellable: Cancellable? = completionTriggeringPublisher
.sink(
receiveCompletion: { completion in
switch completion {
case .failure:
guard completionTriggerOptions.contains(.failure) else { return }
completionAsOutputSubject.send()
case .finished:
guard completionTriggerOptions.contains(.finish) else { return }
completionAsOutputSubject.send()
}
},
receiveValue: { _ in
guard completionTriggerOptions.contains(.output) else { return }
completionAsOutputSubject.send()
}
)
func cleanUp() {
cancellable = nil
}
return self.prefix(untilOutputFrom: completionAsOutputSubject)
.handleEvents(
receiveCompletion: { _ in cleanUp() },
receiveCancel: {
cancellable?.cancel()
cleanUp()
}
)
.eraseToAnyPublisher()
}
}
助手
// MARK: Publishers + CompletionTriggerOptions
public extension Publishers {
struct CompletionTriggerOptions: OptionSet {
public let rawValue: Int
public init(rawValue: Int) {
self.rawValue = rawValue
}
}
}
public extension Publishers.CompletionTriggerOptions {
static let output = Self(rawValue: 1 << 0)
static let finish = Self(rawValue: 1 << 1)
static let failure = Self(rawValue: 1 << 2)
static let completion: Self = [.finish, .failure]
static let all: Self = [.output, .finish, .failure]
}
运算符
public extension Publisher {
func prefix<CompletionTrigger>(
untilCompletionFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .completion)
}
func prefix<CompletionTrigger>(
untilFinishFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .finish)
}
func prefix<CompletionTrigger>(
untilFailureFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .failure)
}
func prefix<CompletionTrigger>(
untilOutputOrFinishFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .finish])
}
///
func prefix<CompletionTrigger>(
untilOutputOrCompletionFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .completion])
}
}
单元测试
import Foundation
import XCTest
import Combine
final class PrefixUntilCompletionFromTests: TestCase {
// MARK: Combine's bundled
func test_that_publisher___prefix_untilOutputFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
return [=14=].merge(with: ).prefix(untilOutputFrom: finishTriggeringSubject).eraseToAnyPublisher()
}
}
// MARK: Custom `prefix(until*`
// MARK: `prefix:untilCompletionFrom`
func test_that_publisher___prefix_untilCompletionFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
[=14=].merge(with: ).prefix(untilCompletionFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilOutputOrFinishFrom`
func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
[=14=].merge(with: ).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
[=14=].merge(with: ).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilOutputOrCompletionFrom`
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
[=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
[=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_failure() {
struct ErrorMarker: Swift.Error {}
let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
}
) {
[=14=].merge(with: ).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilFailureFrom`
func test_that_publisher___prefix_untilFailureFrom___completes_when_received_output() {
struct ErrorMarker: Swift.Error {}
let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
}
) {
[=14=].merge(with: ).prefix(untilFailureFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilEventFrom`
func test_that_publisher___prefix_untilEventFrom___outut_completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
[=14=].merge(with: ).prefix(untilEventFrom: finishTriggeringSubject, completionTriggerOptions: [.output])
}
}
func doTestPublisherCompletes(
_ line: UInt = #line,
triggerFinish: () -> Void,
makePublisherToTest: (
_ first: AnyPublisher<Int, Never>,
_ second: AnyPublisher<Int, Never>
) -> AnyPublisher<Int, Never>
) {
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let publisherToTest = makePublisherToTest(
first.eraseToAnyPublisher(),
second.eraseToAnyPublisher()
)
var returnValues = [Int]()
let expectation = XCTestExpectation(description: self.debugDescription)
let cancellable = publisherToTest
.sink(
receiveCompletion: { _ in expectation.fulfill() },
receiveValue: { returnValues.append([=14=]) }
)
first.send(1)
first.send(2)
first.send(completion: .finished)
first.send(3)
second.send(4)
triggerFinish()
second.send(5)
wait(for: [expectation], timeout: 0.1)
// output `3` sent by subject `first` is ignored, since it's sent after it has completed.
// output `5` sent by subject `second` is ignored since it's sent after our `publisherToTest` has completed
XCTAssertEqual(returnValues, [1, 2, 4], line: line)
XCTAssertNotNil(cancellable, line: line)
}
}