为每个新的成功上游合并 FlatMap 副本

Combine FlatMap duplicates for every new successful upstream

在我的 Combine 管道中,我试图通过使用 flatMap 捕获原始发布者之外的错误来防止上游终止:

PlaygroundPage.current.needsIndefiniteExecution = true

let cancellable = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()
    .flatMap { _ in
        Timer.publish(every: 1, on: .main, in: .default)
            .autoconnect()
            .tryMap { value throws -> String? in
                return "\(Date())"
            }
            .catch { _ in
                Just(nil)
            }
            .filter { [=10=] != nil }
    }
    .sink { _ in
        print("complete")
        PlaygroundPage.current.finishExecution()
    } receiveValue: {
        print("sink: \(String(describing: [=10=]))")
    }

虽然当错误被替换为 nil 时,这永远不会终止上游发布者,但出于某种原因,它会复制每个上游事件。上面的代码将导致:

sink: Optional("2021-01-16 06:49:58 +0000")
sink: Optional("2021-01-16 06:49:59 +0000")
sink: Optional("2021-01-16 06:49:59 +0000")
sink: Optional("2021-01-16 06:50:00 +0000")
sink: Optional("2021-01-16 06:50:00 +0000")
sink: Optional("2021-01-16 06:50:00 +0000")
sink: Optional("2021-01-16 06:50:01 +0000")
sink: Optional("2021-01-16 06:50:01 +0000")
sink: Optional("2021-01-16 06:50:01 +0000")
sink: Optional("2021-01-16 06:50:01 +0000")
sink: Optional("2021-01-16 06:50:02 +0000")
sink: Optional("2021-01-16 06:50:02 +0000")
sink: Optional("2021-01-16 06:50:02 +0000")
sink: Optional("2021-01-16 06:50:02 +0000")
sink: Optional("2021-01-16 06:50:02 +0000")
sink: Optional("2021-01-16 06:50:03 +0000")
sink: Optional("2021-01-16 06:50:03 +0000")
sink: Optional("2021-01-16 06:50:03 +0000")
sink: Optional("2021-01-16 06:50:03 +0000")
sink: Optional("2021-01-16 06:50:03 +0000")
sink: Optional("2021-01-16 06:50:03 +0000")

在第一个事件中,输出只有一次。第二个上游事件,是两次。第三个上游导致三个输出,依此类推。

似乎 flatMap 在每个上游事件中创建一个新的发布者并保持活动状态。我如何取消以前的 flatMaps 并始终获得一个值,或者有更好的方法来做到这一点?我不想让过去的上游事件继续存在,我只关心一个值而不是旧值。我想以这样的方式结束:

sink: Optional("2021-01-16 06:49:58 +0000")
sink: Optional("2021-01-16 06:49:59 +0000")
sink: Optional("2021-01-16 06:50:00 +0000")
sink: Optional("2021-01-16 06:50:01 +0000")
sink: Optional("2021-01-16 06:50:02 +0000")
sink: Optional("2021-01-16 06:50:03 +0000")
sink: Optional("2021-01-16 06:50:03 +0000")

FlatMap 的关闭 returns 每个上游值的发布者。返回的发布者可以发出多个值。因此,在您的示例中,每秒 FlatMap returns 一个新的 Timer 发布者每秒都会发布值。

相反,使用 Just 发布者发出每个上游值和

Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()
    .flatMap { value in
        Just(value)
            .tryMap { value throws -> String? in
                return "\(Date())"
            }
            .catch { _ in
                Just(nil)
            }
            .filter { [=10=] != nil }
    }

对于您的示例来说,使用 compactMap 并使用 try? 进行抛出调用可能更简单,而不是 flatMap 方法:

Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()
    .compactMap { value in
        try? someThrowingFunction(value)
    }