RxJava/Jersey2 的异步 RestAPI。线程问题?
Asynchronous RestAPIs with RxJava/Jersey2. Threading questions?
我们正在使用反应式编程制作 REST 原型 API。
如图所示,我们保持 3 层与之前同步 API 设计中使用的相同;
http://oi59.tinypic.com/339hhki.jpg
- API 使用 Jersey2 实现的层将处理 request/deserialize JSON 并移交给服务层。
- 实现业务的服务层-logic.Implemented 使用反应式编程 (RxJava)
- 服务用于持久化操作的 Dao 层Layer.Since我们使用 CouchBase,这将使用 CouchBase RxClient。
据我了解流程如下:
a) HTTP请求来了,Jersery会在"Container Thread pool".[=的RequestThread里面处理request/parse JSON/deserialize请求模型16=]
b)有了Jersey2 Async支持,RequestThread会return回到容器线程池,Service Layer会在Schedulers.computation( ) 调度程序。
@Path("/resource")
public class AsyncUserResource {
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation
user.subscribe(new Observer<User>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//handle error using ExceptionMappers
}
@Override
public void onNext(User user) {
asyncResponse.resume(user);
}});
}
}
c) DAO 内部的任何 IO 操作都将在单独的线程中使用 Schedulers.io() 到 运行 这些长时间处理操作。
我的问题是:
- 在实现 DAOs/Services 时,我是否应该在实现中隐藏正在使用的调度程序(线程)。
例如道:
public interface UserDao {
public Observable<User> getUser();
}
在实施中,按如下方式指定计划是否是一种好的做法;
public Observable<User> getUser() {
Observable<User> ret = Observable.create((subscriber)->{
try {
//Do DB call
User u = null;
subscriber.onNext(u);
subscriber.onCompleted();
}catch (Exception e) {
subscriber.onError(e);
}
});
return ret.subscribeOn(Schedulers.io());
}
还是直接returnObservable,上层会相应的使用一个特定的Schedular更好?
由于 DAO 主要涉及 io/network 调用,我认为应该使用 Schedulars.io()。
服务层的业务逻辑怎么样?它们应该在 Schedulers.computation() (事件循环)中执行吗?
里面有两个线程池 JVM.One 一个是 "Container Thread Pool" 另一个是 "RxThread Pool" 被 Schedulers.io() 使用。
如何配置RxJava的pool settings/size ?
1) 在 RxJava 本身,如果一个方法需要一个调度器,我们创建两个重载:一个没有 Scheduler 参数,一个有它。然后前者使用合理的默认调度程序委托给后者。这样,API 消费者可能会选择接受默认设置或自行选择。
2) 这取决于你的计算。如果计算花费的时间与等待 IO 的时间相似,您可以将计算移至计算调度程序中,从而释放 IO 中缓存的工作线程以执行更多阻塞。否则,您可以只在同一个调度程序上执行业务逻辑。
3) 目前您无法在 RxJava 中配置池大小。计算将始终使用 Runtime.availableProcessors()
并且 IO 将始终充当无界缓存线程池。如果您可以忍受事件线程跳跃(意思是:它们保证是串行的,但一个事件可能在线程 1 上执行,随后在线程 2 上执行),您可以通过 Schedulers.from()
.[=12 使用自己的 ExecutorServices =]
我们正在使用反应式编程制作 REST 原型 API。 如图所示,我们保持 3 层与之前同步 API 设计中使用的相同;
- API 使用 Jersey2 实现的层将处理 request/deserialize JSON 并移交给服务层。
- 实现业务的服务层-logic.Implemented 使用反应式编程 (RxJava)
- 服务用于持久化操作的 Dao 层Layer.Since我们使用 CouchBase,这将使用 CouchBase RxClient。
据我了解流程如下:
a) HTTP请求来了,Jersery会在"Container Thread pool".[=的RequestThread里面处理request/parse JSON/deserialize请求模型16=]
b)有了Jersey2 Async支持,RequestThread会return回到容器线程池,Service Layer会在Schedulers.computation( ) 调度程序。
@Path("/resource")
public class AsyncUserResource {
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation
user.subscribe(new Observer<User>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//handle error using ExceptionMappers
}
@Override
public void onNext(User user) {
asyncResponse.resume(user);
}});
}
}
c) DAO 内部的任何 IO 操作都将在单独的线程中使用 Schedulers.io() 到 运行 这些长时间处理操作。
我的问题是:
- 在实现 DAOs/Services 时,我是否应该在实现中隐藏正在使用的调度程序(线程)。
例如道:
public interface UserDao {
public Observable<User> getUser();
}
在实施中,按如下方式指定计划是否是一种好的做法;
public Observable<User> getUser() {
Observable<User> ret = Observable.create((subscriber)->{
try {
//Do DB call
User u = null;
subscriber.onNext(u);
subscriber.onCompleted();
}catch (Exception e) {
subscriber.onError(e);
}
});
return ret.subscribeOn(Schedulers.io());
}
还是直接returnObservable,上层会相应的使用一个特定的Schedular更好?
由于 DAO 主要涉及 io/network 调用,我认为应该使用 Schedulars.io()。 服务层的业务逻辑怎么样?它们应该在 Schedulers.computation() (事件循环)中执行吗?
里面有两个线程池 JVM.One 一个是 "Container Thread Pool" 另一个是 "RxThread Pool" 被 Schedulers.io() 使用。 如何配置RxJava的pool settings/size ?
1) 在 RxJava 本身,如果一个方法需要一个调度器,我们创建两个重载:一个没有 Scheduler 参数,一个有它。然后前者使用合理的默认调度程序委托给后者。这样,API 消费者可能会选择接受默认设置或自行选择。
2) 这取决于你的计算。如果计算花费的时间与等待 IO 的时间相似,您可以将计算移至计算调度程序中,从而释放 IO 中缓存的工作线程以执行更多阻塞。否则,您可以只在同一个调度程序上执行业务逻辑。
3) 目前您无法在 RxJava 中配置池大小。计算将始终使用 Runtime.availableProcessors()
并且 IO 将始终充当无界缓存线程池。如果您可以忍受事件线程跳跃(意思是:它们保证是串行的,但一个事件可能在线程 1 上执行,随后在线程 2 上执行),您可以通过 Schedulers.from()
.[=12 使用自己的 ExecutorServices =]