这个 ScalaRX 代码在做什么?

What is this ScalaRX code doing?

所以我对 Scala 和 RX 都很陌生。知道最多的人和实际编写此代码的人刚刚离开,我不确定发生了什么。这个结构遍布他的代码,我不太清楚它在做什么:

def foo(List[Long]) : Observable[Unit] =
  Observable {
    subscriber => {
      do some stuff
      subscriber.onNext()
      subscriber.onCompleted()
    }

我主要接到 do some stuff 和给订阅者的电话。我不明白的是,subscriber 是从哪里来的? subscriber => { 是否实例化订阅者? Observable { subscriber => { ... } } do/mean 是什么意思?

如果您查看 Observable companion object 文档,您会看到一个 apply 方法接受类型 (Subscriber[T]) ⇒ Unit 的函数。所以,当你调用 Observable{withSomeLambda} 时,这与调用 Observable.apply{withSomeLambda}

是一样的

而且,如果你一路走到 source code 你会发现这真的回来了

toScalaObservable(rx.Observable.create(f))

其中 f 是您传入的 lambda。

所以,subscriber只是lambda的参数。它由该函数的调用者传入。

此代码正在创建新的 Observable,如 here 所述。

基本上当一个下游组件订阅这个流时,这个回调就会被调用。在回调中,我们确定我们作为数据源何时调用 onNext(v: T),这是我们将生成的每个元素传递给它们的方式,以及我们何时调用 onCompleted(),这是我们告诉订阅者的方式我们已完成发送数据。

创建 Observable 后,您可以开始调用 Observable operators,这将导致另一个复合 Observable,或者导致终止条件,从而结束过程,通常会产生流的最终结果(通常是集合或聚合值)。

你没有在你的问题中使用 List,但通常如果你想从列表中创建一个反应流,你会调用 Observable.from().

PS: 我认为这是 RxJava 代码。