Combine 中 flatMap 中发布者的过早完成

Premature completion of publisher in flatMap in Combine

我有这个最小的例子:

import UIKit
import Combine

var values = [1,2,3,4,5]

var cancel = values.publisher
    .delay(for: 0.1, scheduler: DispatchQueue.global())
    .print()
    .flatMap() { i in
        [i].publisher.first()
    }
    .sink { completion in
        print("Received Completion: \(completion)")
    } receiveValue: { v in
        print("Received Value: \(v)")
    }

我的期望是源发布者将 1 到 5 的值发送到流中。每个数字都被转换成(只是为了它)一个新的发布者,它恰好发出第一个值然后完成。由于这是对每个数字完成的,因此我希望所有值都到达接收器。然而,情况并非如此。输出如下所示:

request unlimited
receive value: (1)
Received Value: 1
receive value: (2)
Received Value: 2
receive value: (4)
Received Value: 4
receive finished
Received Completion: finished
receive value: (3)
receive value: (5)

事实上,只有 3 个值在完成事件到达之前到达接收器。为什么是这样? documentation 状态:

successful completion of the new Publisher does not complete the overall stream.

更奇怪的是,当您将 .flatMap() 替换为 .flatMap(maxPublishers: .max(1)) 并将 .share() 添加到原始源发布者时,只有第一个值会进入接收器。

非常感谢任何指点!

您对 DispatchQueue.global() 的使用存在问题。一旦 values.publisher 收到订阅,values.publisher 就会将其所有输出及其完成发送到下游的 delay 运算符。 delay 运算符在 DispatchQueue.global() 0.1 秒后将六个块(五个用于输出数字,一个用于完成)安排到 运行。

DispatchQueue.global() 是一个 并发 队列。这意味着它可以同时 运行 任意数量的块。因此无法保证六个计划块中的哪一个将首先完成。

使用并发队列作为 Combine 调度程序通常不是一个好主意。您应该改用串行队列。这是一个简单的例子:

var cancel = values.publisher
    .delay(for: 0.1, scheduler: DispatchQueue(label: "my queue"))
    .print()
    .flatMap() { i in
        [i].publisher.first()
    }
    .sink { completion in
        print("Received Completion: \(completion)")
    } receiveValue: { v in
        print("Received Value: \(v)")
    }

但您可能希望创建一次队列并将其存储在 属性 中,这样您就可以将其用于多个发布者。

如果您确实希望输出在主队列上传递(可能是因为您要使用它们来更新视图),您应该只使用 DispatchQueue.main.