为什么 Combine 的 receive(on:) 运算符会吞下错误?

Why does Combine's receive(on:) operator swallow errors?

以下管道:

enum MyError: Error {
  case oops
}
let cancel = Fail<Int, Error>(error: MyError.oops)
  .print("1>")
  .print("2>")
  .sink(receiveCompletion: { status in
    print("status>", status)
  }) { value in
    print("value>", value)
}

输出:

1>: receive subscription: (Empty)
2>: receive subscription: (Print)
2>: request unlimited
1>: request unlimited
1>: receive error: (oops)
2>: receive error: (oops)
status> failure(__lldb_expr_126.MyError.oops)

问题

但是,如果我在前面的管道中插入一个 receive(on:) 运算符:

enum MyError: Error {
  case oops
}
let cancel = Fail<Int, Error>(error: MyError.oops)
  .print("1>")
  .receive(on: RunLoop.main)
  .print("2>")
  .sink(receiveCompletion: { status in
    print("status>", status)
  }) { value in
    print("value>", value)
}

输出为:

1>: receive subscription: (Empty)
1>: receive error: (oops)

receive 运算符似乎使管道短路。我还没有看到其他发布商发生这种情况,只是当我使用 FailPassthroughSubject 发布商时。

这是预期的行为吗?如果有,是什么原因?


解决方法

以下是创建与 receive(on:) 发布商合作的失败发布商的示例:

struct FooModel: Codable {
  let title: String
}

func failPublisher() -> AnyPublisher<FooModel, Error> {
  return Just(Data(base64Encoded: "")!)
    .decode(type: FooModel.self, decoder: JSONDecoder())
    .eraseToAnyPublisher()
}

let cancel = failPublisher()
  .print("1>")
  .receive(on: RunLoop.main)
  .print("2>")
  .sink(receiveCompletion: { status in
    print("status>", status)
  }) { value in
    print("value>", value)
}

您可能 运行 遇到了 this post 中讨论的相同问题。显然 receive(on:) 将通过给定的调度程序异步发送 all 消息,包括订阅消息。所以发生的事情是在订阅事件有机会异步发送之前发送错误,因此当下一个事件到来时,没有订阅者附加到 receive 发布者。

但是,从 developer beta 1 of iOS 13.3 开始,他们似乎正在对此进行更改:

As of developer beta 1 of iOS 13.3 (and associated releases for other platforms), we've changed the behavior of receive(on:) plus other Scheduler operators to synchronously send their subscription downstream. Previously, they would "async" it to the provided scheduler.