如何异步 运行 这个 Observable?

How to run This Observable asynchronously?

我有以下模仿阻塞 IO 操作的服务:

public class StudentService{

public List<Student> getStudentAslist(){

            Thread.sleep(10000);
.....
        return students;
    }
}

现在我正在尝试 运行 使用 Java 8 环境 FutureTask<T> 中的此服务:

     StudentService studentService = new StudentService();  
     FutureTask<List<Student>> future = new FutureTask<>(()-> studentService.getStudentAslist());

     Worker worker = Schedulers.io().createWorker();
            worker.schedule(()->future.run());


     Observable<Student> observable2 = Observable.from(future).flatMap(Observable::from);
    observable2.filter((s)-> s.getAge()>40).subscribe(student->System.out.println(student));

    System.out.println("end");

但是最后的打印 "end" 只在 10 秒后打印,所以看起来我是完全同步的,

我怎样才能运行它反应?

我想你要找的是.subscribeOn

FutureTask<List<String>> future = new FutureTask<>(this::getStudentAslist);

Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(future::run);

Observable.from(future)
        .subscribeOn(Schedulers.computation())
        .flatMapIterable(s -> s)
        .subscribe(System.out::println);
System.out.println("marker");


TimeUnit.SECONDS.sleep(3);

供参考:

public List<String> getStudentAslist(){
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return Collections.singletonList("finished");
}

您可以使用 defer 将所有工作移至 Observable

FutureTask<List<String>> future = new FutureTask<>(this::getStudentAslist);

Observable<List<Student>> studentsObservable = Observable
        .defer(() -> {
            future.run();
            return Observable.from(future);
        });

然后使用subscribeOn来决定哪个线程将用于这个Observable

studentsObservable
        .flatMapIterable(students -> students)
        .filter((s) -> s.getAge() > 40)
        .subscribeOn(Schedulers.computation())
        .subscribe(student -> System.out.println(student));