Observable.create 和 fromPublisher 有什么区别?

What is the difference between Observable.create and fromPublisher?

将下面源代码中的 Observable.create 更改为 Observable.fromPublisher 不起作用。 (如果样本不存在,则全部订阅,但如果样本存在,则不订阅任何内容。)

Observable.create 和 fromPublisher 有什么区别?

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class SampleMain {
    public static void main(String[] args) {
        Observable<String> o = Observable.create(s -> {
            new Thread(() -> {
                for (int i=0; i<100; i++) {
                    s.onNext("Hello Observable.fromPublisher() A" + i);
                    s.onNext("Hello Observable.fromPublisher() B" + i);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                s.onComplete();
            }).start();
        });
        o
                .sample(1, TimeUnit.MILLISECONDS)
                .subscribe(System.out::println);
    }
}

fromPublisher 需要正确实施的 org.reactivestreams.Publisher 以纪念 rules。这些 Publisher 通常来自第三方库或 API。

create 内置了基础设施,可以将更简单的发射器样式 API 转换为 Observable,这样开发人员就不必担心太多底层协议。

我还可以让您注意 fromPublisher 的 javadoc:

The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.

If possible, use create(ObservableOnSubscribe) to create a source-like Observable instead.

Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda.