如何在 rxJava 中订阅不同的线程池
How to subscribe in rxJava to a different thread pool
我现在运行正在我的程序中加入这一点
Observable.from(constituents.entrySet()).subscribeOn(Schedulers.newThread())
.flatMap(Async.toAsync((Map.Entry<String, ConstituentInfo> entry) -> {
logger.info(Thread.currentThread().getName());
ConstituentInfo constituent = entry.getValue();
String securityBySymbol = Redux.getSecurityBySymbol(entry.getKey());
String company = UtilityMethods.getNestedJsonObject(securityBySymbol, "company");
Integer compId = UtilityMethods.getIntegerFromJsonObject(company, "id");
String companyName = UtilityMethods.getStringFromJsonObject(company, "name");
String tier = UtilityMethods.getNestedJsonObject(securityBySymbol, "tier");
String tierId = UtilityMethods.getStringFromJsonObject(tier, "id");
String marketPlace = UtilityMethods.getStringFromJsonObject(tier, "name");
String countryName = getCountryName(compId);
constituent.setCompanyName(StringUtils.isBlank(companyName) ? NA : companyName);
constituent.setMarketPlace(StringUtils.isBlank(marketPlace) ? NA : marketPlace);
constituent.setCountryName(StringUtils.isBlank(countryName) ? NA : countryName);
constituent.setTierId(StringUtils.isBlank(tierId) ? NA : tierId);
return constituent;
})).subscribeOn(Schedulers.newThread())
.toList()
.timeout(30, TimeUnit.MINUTES)
.toBlocking()
.single();
和它 运行 并发,但它 运行 在 RxComputationThreadPool 上。我想知道如何在 Schedulers.newThread() 上使它成为 运行,以及它是否会提高性能。
或者,如果它不会提高性能,有没有办法让下面的代码 运行 更快?
有一个 toAsync
的重载需要一个 Scheduler
而你不需要 subscribeOn
。 computation()
调度器是所有调度器中延迟最低的。 io()
很可能并且 newThread()
肯定会启动一个新线程,因此可能需要几百微秒来执行第一个任务,但它们非常适合阻塞 I/O 或这种延迟不存在的网络调用这真的很重要。
我现在运行正在我的程序中加入这一点
Observable.from(constituents.entrySet()).subscribeOn(Schedulers.newThread())
.flatMap(Async.toAsync((Map.Entry<String, ConstituentInfo> entry) -> {
logger.info(Thread.currentThread().getName());
ConstituentInfo constituent = entry.getValue();
String securityBySymbol = Redux.getSecurityBySymbol(entry.getKey());
String company = UtilityMethods.getNestedJsonObject(securityBySymbol, "company");
Integer compId = UtilityMethods.getIntegerFromJsonObject(company, "id");
String companyName = UtilityMethods.getStringFromJsonObject(company, "name");
String tier = UtilityMethods.getNestedJsonObject(securityBySymbol, "tier");
String tierId = UtilityMethods.getStringFromJsonObject(tier, "id");
String marketPlace = UtilityMethods.getStringFromJsonObject(tier, "name");
String countryName = getCountryName(compId);
constituent.setCompanyName(StringUtils.isBlank(companyName) ? NA : companyName);
constituent.setMarketPlace(StringUtils.isBlank(marketPlace) ? NA : marketPlace);
constituent.setCountryName(StringUtils.isBlank(countryName) ? NA : countryName);
constituent.setTierId(StringUtils.isBlank(tierId) ? NA : tierId);
return constituent;
})).subscribeOn(Schedulers.newThread())
.toList()
.timeout(30, TimeUnit.MINUTES)
.toBlocking()
.single();
和它 运行 并发,但它 运行 在 RxComputationThreadPool 上。我想知道如何在 Schedulers.newThread() 上使它成为 运行,以及它是否会提高性能。
或者,如果它不会提高性能,有没有办法让下面的代码 运行 更快?
有一个 toAsync
的重载需要一个 Scheduler
而你不需要 subscribeOn
。 computation()
调度器是所有调度器中延迟最低的。 io()
很可能并且 newThread()
肯定会启动一个新线程,因此可能需要几百微秒来执行第一个任务,但它们非常适合阻塞 I/O 或这种延迟不存在的网络调用这真的很重要。