RxJava 等效于简单的 ThreadPoolExecutor 示例
RxJava equivalent of simple ThreadPoolExecutor example
我已经离开 Java 游戏 ~8 年了,从那以后发生了很多变化。对我来说最大的挑战是 RxJava / reactive。我正在寻找有关如何以完全反应的方式执行以下等效操作的粗略指导。
下面使用 ThreadPoolExecutor 实现的基本要求是通过调用远程 Web 服务来处理大量 Stuff
,记录的速率限制为 100 requests/minute。我的目标是尽可能快地处理尽可能多的数据,不丢弃任何 Stuff
但仍然遵守下游速率限制。此代码已被简化以避免错误、隔板、断路器、重试逻辑等。
这段代码目前工作正常,但在所有非阻塞反应选项的情况下,它会导致感觉像是浪费了很多线程。甚至我用来调用我的服务的 HTTP 客户端也提供了一个 Flowable
,我只是在每个执行程序的 20 个线程中阻塞了它。
我很想了解响应式等效项应该是什么。我一直在努力的地方是我发现的几乎所有文档都使用 Observable 的静态源来展示(例如:Observable.fromArray(1,2,3,4,5)
)。我知道解决方案可能涉及 IoScheduler
,也许涉及 groupBy
,但我还没有弄清楚如何将来自我的 HTTP 客户端的 Flowable
合并到一些执行并行化的完整链中(向上限制,例如 20) 和速率限制。
public class Example {
private static final int THREADS = 20;
// using https://docs.micronaut.io/latest/guide/index.html#httpClient
@Client("http://stuff-processor.internal:8080")
@Inject
RxHttpClient httpClient;
private ThreadPoolExecutor executor;
private final RateLimiter rateLimiter;
public Example() {
// up to 20 threads to process the unbounded queue
// incoming Stuff is very bursty...
// ...we could go hours without anything and then hundreds could come in
this.executor = new ThreadPoolExecutor(THREADS, THREADS,
30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
this.executor.allowCoreThreadTimeOut(true);
// using https://resilience4j.readme.io/docs/ratelimiter
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(60))
.limitForPeriod(100)
.timeoutDuration(Duration.ofSeconds(90))
.build();
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
}
/**
* Called when the user takes an action that can cause 1 or 1000s of new
* Stuff to be entered into the system. Each instance of Stuff results in
* a separate call to this method. Ex: 100 Stuffs = 100 calls.
*/
void onNewStuff(Stuff stuff) {
final Runnable task = () -> {
final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
HttpRequest.POST("/process", stuff),
Boolean.class);
final HttpResponse<Boolean> response = flowable.blockingFirst();
if (response.body()) {
System.out.println("Success!");
} else {
System.out.println("Fail :(");
}
};
final Runnable rateLimitedTask =
RateLimiter.decorateRunnable(rateLimiter, task);
executor.submit(rateLimitedTask);
}
}
谢谢!
首先,要以完全非阻塞的方式构建它,您需要使用像 Netty 这样的非阻塞、异步 HTTP 客户端库。我不确定 RxHttpClient
是如何工作的。
假设您有一个列表 stuff
。我会这样做:
Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();
flatMap
合并响应。
为了限制速率,您 flatMap
有第二个参数,它限制了它并行订阅的内部流的数量。假设您想一次拨打不超过 10 个电话。这样做:
Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();
我已经离开 Java 游戏 ~8 年了,从那以后发生了很多变化。对我来说最大的挑战是 RxJava / reactive。我正在寻找有关如何以完全反应的方式执行以下等效操作的粗略指导。
下面使用 ThreadPoolExecutor 实现的基本要求是通过调用远程 Web 服务来处理大量 Stuff
,记录的速率限制为 100 requests/minute。我的目标是尽可能快地处理尽可能多的数据,不丢弃任何 Stuff
但仍然遵守下游速率限制。此代码已被简化以避免错误、隔板、断路器、重试逻辑等。
这段代码目前工作正常,但在所有非阻塞反应选项的情况下,它会导致感觉像是浪费了很多线程。甚至我用来调用我的服务的 HTTP 客户端也提供了一个 Flowable
,我只是在每个执行程序的 20 个线程中阻塞了它。
我很想了解响应式等效项应该是什么。我一直在努力的地方是我发现的几乎所有文档都使用 Observable 的静态源来展示(例如:Observable.fromArray(1,2,3,4,5)
)。我知道解决方案可能涉及 IoScheduler
,也许涉及 groupBy
,但我还没有弄清楚如何将来自我的 HTTP 客户端的 Flowable
合并到一些执行并行化的完整链中(向上限制,例如 20) 和速率限制。
public class Example {
private static final int THREADS = 20;
// using https://docs.micronaut.io/latest/guide/index.html#httpClient
@Client("http://stuff-processor.internal:8080")
@Inject
RxHttpClient httpClient;
private ThreadPoolExecutor executor;
private final RateLimiter rateLimiter;
public Example() {
// up to 20 threads to process the unbounded queue
// incoming Stuff is very bursty...
// ...we could go hours without anything and then hundreds could come in
this.executor = new ThreadPoolExecutor(THREADS, THREADS,
30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
this.executor.allowCoreThreadTimeOut(true);
// using https://resilience4j.readme.io/docs/ratelimiter
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(60))
.limitForPeriod(100)
.timeoutDuration(Duration.ofSeconds(90))
.build();
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
}
/**
* Called when the user takes an action that can cause 1 or 1000s of new
* Stuff to be entered into the system. Each instance of Stuff results in
* a separate call to this method. Ex: 100 Stuffs = 100 calls.
*/
void onNewStuff(Stuff stuff) {
final Runnable task = () -> {
final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
HttpRequest.POST("/process", stuff),
Boolean.class);
final HttpResponse<Boolean> response = flowable.blockingFirst();
if (response.body()) {
System.out.println("Success!");
} else {
System.out.println("Fail :(");
}
};
final Runnable rateLimitedTask =
RateLimiter.decorateRunnable(rateLimiter, task);
executor.submit(rateLimitedTask);
}
}
谢谢!
首先,要以完全非阻塞的方式构建它,您需要使用像 Netty 这样的非阻塞、异步 HTTP 客户端库。我不确定 RxHttpClient
是如何工作的。
假设您有一个列表 stuff
。我会这样做:
Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();
flatMap
合并响应。
为了限制速率,您 flatMap
有第二个参数,它限制了它并行订阅的内部流的数量。假设您想一次拨打不超过 10 个电话。这样做:
Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();