RxJava/Jersey2 的异步 RestAPI。线程问题?

Asynchronous RestAPIs with RxJava/Jersey2. Threading questions?

我们正在使用反应式编程制作 REST 原型 API。 如图所示,我们保持 3 层与之前同步 API 设计中使用的相同;

http://oi59.tinypic.com/339hhki.jpg

  1. API 使用 Jersey2 实现的层将处理 request/deserialize JSON 并移交给服务层。
  2. 实现业务的服务层-logic.Implemented 使用反应式编程 (RxJava)
  3. 服务用于持久化操作的 Dao 层Layer.Since我们使用 CouchBase,这将使用 CouchBase RxClient。

据我了解流程如下:

a) HTTP请求来了,Jersery会在"Container Thread pool".[=的RequestThread里面处理request/parse JSON/deserialize请求模型16=]

b)有了Jersey2 Async支持,RequestThread会return回到容器线程池,Service Layer会在Schedulers.computation( ) 调度程序。

@Path("/resource")
public class AsyncUserResource {
    @GET
    public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
 
       Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation


       user.subscribe(new Observer<User>() {

            @Override
            public void onCompleted() { 

            }

            @Override
            public void onError(Throwable e) {
                //handle error using ExceptionMappers

            }

            @Override
            public void onNext(User user) {
               asyncResponse.resume(user); 

            }});
    }        


}

c) DAO 内部的任何 IO 操作都将在单独的线程中使用 Schedulers.io() 到 运行 这些长时间处理操作。

我的问题是:

  1. 在实现 DAOs/Services 时,我是否应该在实现中隐藏正在使用的调度程序(线程)。

例如道:

public interface UserDao {
  public Observable<User> getUser();
}

在实施中,按如下方式指定计划是否是一种好的做法;

public Observable<User> getUser() {

        Observable<User> ret = Observable.create((subscriber)->{
            try {

                 //Do DB call
                 User u = null;
                 subscriber.onNext(u);
                 subscriber.onCompleted();

            }catch (Exception e) {
                subscriber.onError(e);  
            }

        });
        return ret.subscribeOn(Schedulers.io());
}

还是直接returnObservable,上层会相应的使用一个特定的Schedular更好?

  1. 由于 DAO 主要涉及 io/network 调用,我认为应该使用 Schedulars.io()。 服务层的业务逻辑怎么样?它们应该在 Schedulers.computation() (事件循环)中执行吗?

  2. 里面有两个线程池 JVM.One 一个是 "Container Thread Pool" 另一个是 "RxThread Pool" 被 Schedulers.io() 使用。 如何配置RxJava的pool settings/size ?

1) 在 RxJava 本身,如果一个方法需要一个调度器,我们创建两个重载:一个没有 Scheduler 参数,一个有它。然后前者使用合理的默认调度程序委托给后者。这样,API 消费者可能会选择接受默认设置或自行选择。

2) 这取决于你的计算。如果计算花费的时间与等待 IO 的时间相似,您可以将计算移至计算调度程序中,从而释放 IO 中缓存的工作线程以执行更多阻塞。否则,您可以只在同一个调度程序上执行业务逻辑。

3) 目前您无法在 RxJava 中配置池大小。计算将始终使用 Runtime.availableProcessors() 并且 IO 将始终充当无界缓存线程池。如果您可以忍受事件线程跳跃(意思是:它们保证是串行的,但一个事件可能在线程 1 上执行,随后在线程 2 上执行),您可以通过 Schedulers.from().[=12 使用自己的 ExecutorServices =]