我们可以避免 Subscribe() 在 RxJava 中阻塞吗

Can we Avoid Subscribe() from blocking in RxJava

我正在学习 RxJava 并编写代码

使用这个class创建一个随机整数列表

 public class Sources {
    
        public List<Integer> randomIntegerList() throws InterruptedException {
    
            TimeUnit.SECONDS.sleep(4);
    
            return IntStream
                    .range(0, 99)
                    .boxed()
                    .collect(Collectors.toCollection(ArrayList::new));
        }
    }

一些随机函数

public class Methods {

    public boolean isEven(Integer integer) {
        if (integer % 2 == 0) {
            return true;
        }

        return false;
    }
}

问题出在这里

public class DemoApplication {

    public static void main(String[] args) {
        new DemoApplication().executeSingle();
    }


    private void executeSingle() {
        Single.create(emitter -> {

            List<Integer> list = null;
            try {

                list = new Sources()
                        .randomIntegerList();
            } catch (InterruptedException e) {
                emitter.onError(e);
            }

            emitter.onSuccess(list);
        })
                .subscribe(
                        x -> {printData((List<Integer>) x);}
                );

        System.out.println("Finished");
    }

    private void printData(List<Integer> list) {
        list    .stream()
                .filter(y -> new Methods().isEven(y))
                .forEach(System.out::println);
    }
}

Finished 在延迟 4 秒后打印。基本上在订阅执行其功能后,我怎样才能使其成为非阻塞的。我想要的是线程继续执行下一行,当调用 printData() 函数时,该线程停止执行并首先执行 printData()。

默认情况下,Observable 在声明和订阅 Observer 的同一线程上执行。我稍微修改了你的例子(使用 rxJava 3)-

Observable<Integer> randIntegerObservable =
        Observable.create(
            emitter -> {
              List<Integer> integers = new Sources().randomIntegerList();
              for (Integer integer : integers) {
                emitter.onNext(integer);
              }
            });

    randIntegerObservable
        .filter(i -> i % 2 == 0)
        .subscribe(i -> System.out.println(Thread.currentThread().getName() + " - " + i.toString())); 

如果你运行上面的代码你会看到类似

的输出
main - 0
main - 2
main - 4
main - 6
main - 8 ...

您可以使用 subscribeOn() 运算符,它指示源在指定的调度程序(单独的线程)上触发排放

添加subscribeOn()后的代码-

Observable<Integer> randIntegerObservable =
        Observable.create(
            emitter -> {
              List<Integer> integers = new Sources().randomIntegerList();
              for (Integer integer : integers) {
                emitter.onNext(integer);
              }
            });

    randIntegerObservable
        .filter(i -> i % 2 == 0)
            .subscribeOn(Schedulers.io())
        .subscribe(
            i -> System.out.println(Thread.currentThread().getName() + " - " + i.toString())
            );

    TimeUnit.SECONDS.sleep(6);

和输出(不要忘记在最后添加睡眠否则主线程将在输出打印之前退出)-

RxCachedThreadScheduler-1 - 0
RxCachedThreadScheduler-1 - 2
RxCachedThreadScheduler-1 - 4
RxCachedThreadScheduler-1 - 6
RxCachedThreadScheduler-1 - 8 ..

勾选ReactiveX documentation for various Schedulers available. You can also use observeOn() instead of subscribeOn(). There is subtle difference between subscribeOn() and observeOn(), as observeOn() can be used to switch different scheduler downstream. You can find more detail in this