如何在 Spring Flux 中并行化数据库查询?
How to parallelize database queries in Spring Flux?
我想在 Spring 中公开来自 mysql
数据库的聚合结果和 Flux<JSONObject>
流。
@RestController
public class FluxController {
@GetMapping("/", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> stream() {
return service.getJson();
}
}
@Service
public class DatabaseService {
public List<JSONObject> getJson() {
List<Long> refs = jdbc.queryForList(...);
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("refs", refs);
//of course real world sql is much more complex
List<Long, Product> products = jdbc.query(SELECT * from products where ref IN (:refs), params);
List<Long, Item> items = jdbc.query(SELECT * from items where ref IN (:refs), params);
List<Long, Warehouse> warehouses = jdbc.query(SELECT * from warehouses where ref IN (:refs), params);
List<JSONObject> results = new ArrayList<>();
for (Long ref : refs) {
JSONObject json = new JSONObject();
json.put("ref", ref);
json.put("product", products.get(ref));
json.put("item", items.get(ref));
json.put("warehouse", warehouses.get(ref));
results.add(json);
}
return results;
}
现在我想将其转换为一个流量,将其公开为一个事件流。但是我怎样才能并行化数据库查找并将它链接在一起成为一个通量?
public Flux<JSONObject> getJsonFlux() {
//I need this as source
List<Long> refs = jdbc.queryForList(...);
return Flux.fromIterable(refs).map(refs -> {
//TODO how to aggregate the different database calls concurrently?
//and then expose each JSONObject one by one into the stream as soon as it is build?
};
}
旁注:我知道这仍然会阻塞。但在我的实际应用程序中,我正在应用分页和分块,所以每个块都会在准备好时暴露给流。
那么主要的问题是我不知道如何并行化,然后 aggregate/merge 结果例如在最后一个通量步骤中。
如果我理解得很好,您想通过将所有引用作为参数传递来执行查询。
它不会真正成为一个事件流,因为它会等到所有查询都完成并且所有 json 对象都在内存中,然后才开始流式传输它们。
public Flux<JSONObject> getJsonFlux()
{
return Mono.fromCallable(jdbc::queryForList)
.subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one
.flatMap(this::queryEntities)
.map(this::createJsonObjects)
.flatMapMany(Flux::fromIterable);
}
private Mono<Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>>> queryEntities(List<Long> refs)
{
Mono<List<Product>> products = Mono.fromCallable(() -> jdbc.queryProducts(refs)).subscribeOn(Schedulers.elastic());
Mono<List<Item>> items = Mono.fromCallable(() -> jdbc.queryItems(refs)).subscribeOn(Schedulers.elastic());
Mono<List<Warehouse>> warehouses = Mono.fromCallable(() -> jdbc.queryWarehouses(refs)).subscribeOn(Schedulers.elastic());
return Mono.zip(Mono.just(refs), products, items, warehouses); // query calls will be concurrent
}
private List<JSONObject> createJsonObjects(Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>> tuple)
{
List<Long> refs = tuple.getT1();
List<Product> products = tuple.getT2();
List<Item> items = tuple.getT3();
List<Warehouse> warehouses = tuple.getT4();
List<JSONObject> jsonObjects = new ArrayList<>();
for (Long ref : refs)
{
JSONObject json = new JSONObject();
// build json object here
jsonObjects.add(json);
}
return jsonObjects;
}
另一种方法是分别查询每个引用的实体。这样每个 JSONObject 都被单独查询,并且它们可以在流中交错。我不确定数据库如何处理这种负载。这是你应该考虑的事情。
public Flux<JSONObject> getJsonFlux()
{
return Mono.fromCallable(jdbc::queryForList)
.flatMapMany(Flux::fromIterable)
.subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one
.flatMap(this::queryEntities)
.map(this::createJsonObject);
}
private Mono<Tuple4<Long, Product, Item, Warehouse>> queryEntities(Long ref)
{
Mono<Product> product = Mono.fromCallable(() -> jdbc.queryProduct(ref)).subscribeOn(Schedulers.elastic());
Mono<Item> item = Mono.fromCallable(() -> jdbc.queryItem(ref)).subscribeOn(Schedulers.elastic());
Mono<Warehouse> warehouse = Mono.fromCallable(() -> jdbc.queryWarehouse(ref))
.subscribeOn(Schedulers.elastic());
return Mono.zip(Mono.just(ref), product, item, warehouse); // query calls will be concurrent
}
private JSONObject createJsonObject(Tuple4<Long, Product, Item, Warehouse> tuple)
{
Long ref = tuple.getT1();
Product product = tuple.getT2();
Item item = tuple.getT3();
Warehouse warehouse = tuple.getT4();
JSONObject json = new JSONObject();
// build json object here
return json;
}
想法是首先获取 refs
的完整列表,然后同时获取产品、项目和仓库 - 我将此称为 Tuple3 lookups
。然后把每个ref
和lookups
组合起来,一一转化为JSONObject
。
return Mono.fromCallable(jdbc::queryForList) //fetches refs
.subscribeOn(Schedulers.elastic())
.flatMapMany(refList -> { //flatMapMany allows to convert Mono to Flux in flatMap operation
Flux<Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>>> lookups = Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList))
.cache().repeat(); //notice cache - it makes sure that Mono.zip is executed only once, not for each zipWith call
return Flux.fromIterable(refList)
.zipWith(lookups);
}
)
.map(t -> {
Long ref = t.getT1();
Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>> lookups = t.getT2();
JSONObject json = new JSONObject();
json.put("ref", ref);
json.put("product", lookups.getT1().get(ref));
json.put("item", lookups.getT2().get(ref));
json.put("warehouse", lookups.getT3().get(ref));
return json;
});
每个数据库调用的方法:
Mono<Map<Long, Product>> fetchProducts(List<Long> refs) {
return Mono.fromCallable(() -> jdbc.query(SELECT * from products where ref IN(:refs),params))
.subscribeOn(Schedulers.elastic());
}
Mono<Map<Long, Item>> fetchItems(List<Long> refs) {
return Mono.fromCallable(() -> jdbc.query(SELECT * from items where ref IN(:refs),params))
.subscribeOn(Schedulers.elastic());
}
Mono<Map<Long, Warehouse>> fetchWarehouses(List<Long> refs) {
return Mono.fromCallable(() -> jdbc.query(SELECT * from warehouses where ref IN(:refs),params))
.subscribeOn(Schedulers.elastic());
}
为什么我需要订阅?
我之所以这么说是因为两个原因:
允许在专用线程上执行数据库查询
线程池,防止阻塞主线程:
https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
它允许真正并行化 Mono.zip
。看到这个,就是
关于 flatMap
,但它也适用于 zip
:
为了完整起见,在 zip 结果上使用 .flatMap()
也是可能的。虽然我不确定这里是否还需要 .cache()
。
.flatMapMany(refList -> {
Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList)).cache()
.flatMap(tuple -> Flux.fromIterable(refList).map(refId -> Tuples.of(refId, tuple)));
.map(tuple -> {
String refId = tuple.getT1();
Tuple lookups = tuple.getT2();
}
})
我想在 Spring 中公开来自 mysql
数据库的聚合结果和 Flux<JSONObject>
流。
@RestController
public class FluxController {
@GetMapping("/", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> stream() {
return service.getJson();
}
}
@Service
public class DatabaseService {
public List<JSONObject> getJson() {
List<Long> refs = jdbc.queryForList(...);
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("refs", refs);
//of course real world sql is much more complex
List<Long, Product> products = jdbc.query(SELECT * from products where ref IN (:refs), params);
List<Long, Item> items = jdbc.query(SELECT * from items where ref IN (:refs), params);
List<Long, Warehouse> warehouses = jdbc.query(SELECT * from warehouses where ref IN (:refs), params);
List<JSONObject> results = new ArrayList<>();
for (Long ref : refs) {
JSONObject json = new JSONObject();
json.put("ref", ref);
json.put("product", products.get(ref));
json.put("item", items.get(ref));
json.put("warehouse", warehouses.get(ref));
results.add(json);
}
return results;
}
现在我想将其转换为一个流量,将其公开为一个事件流。但是我怎样才能并行化数据库查找并将它链接在一起成为一个通量?
public Flux<JSONObject> getJsonFlux() {
//I need this as source
List<Long> refs = jdbc.queryForList(...);
return Flux.fromIterable(refs).map(refs -> {
//TODO how to aggregate the different database calls concurrently?
//and then expose each JSONObject one by one into the stream as soon as it is build?
};
}
旁注:我知道这仍然会阻塞。但在我的实际应用程序中,我正在应用分页和分块,所以每个块都会在准备好时暴露给流。
那么主要的问题是我不知道如何并行化,然后 aggregate/merge 结果例如在最后一个通量步骤中。
如果我理解得很好,您想通过将所有引用作为参数传递来执行查询。
它不会真正成为一个事件流,因为它会等到所有查询都完成并且所有 json 对象都在内存中,然后才开始流式传输它们。
public Flux<JSONObject> getJsonFlux()
{
return Mono.fromCallable(jdbc::queryForList)
.subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one
.flatMap(this::queryEntities)
.map(this::createJsonObjects)
.flatMapMany(Flux::fromIterable);
}
private Mono<Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>>> queryEntities(List<Long> refs)
{
Mono<List<Product>> products = Mono.fromCallable(() -> jdbc.queryProducts(refs)).subscribeOn(Schedulers.elastic());
Mono<List<Item>> items = Mono.fromCallable(() -> jdbc.queryItems(refs)).subscribeOn(Schedulers.elastic());
Mono<List<Warehouse>> warehouses = Mono.fromCallable(() -> jdbc.queryWarehouses(refs)).subscribeOn(Schedulers.elastic());
return Mono.zip(Mono.just(refs), products, items, warehouses); // query calls will be concurrent
}
private List<JSONObject> createJsonObjects(Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>> tuple)
{
List<Long> refs = tuple.getT1();
List<Product> products = tuple.getT2();
List<Item> items = tuple.getT3();
List<Warehouse> warehouses = tuple.getT4();
List<JSONObject> jsonObjects = new ArrayList<>();
for (Long ref : refs)
{
JSONObject json = new JSONObject();
// build json object here
jsonObjects.add(json);
}
return jsonObjects;
}
另一种方法是分别查询每个引用的实体。这样每个 JSONObject 都被单独查询,并且它们可以在流中交错。我不确定数据库如何处理这种负载。这是你应该考虑的事情。
public Flux<JSONObject> getJsonFlux()
{
return Mono.fromCallable(jdbc::queryForList)
.flatMapMany(Flux::fromIterable)
.subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one
.flatMap(this::queryEntities)
.map(this::createJsonObject);
}
private Mono<Tuple4<Long, Product, Item, Warehouse>> queryEntities(Long ref)
{
Mono<Product> product = Mono.fromCallable(() -> jdbc.queryProduct(ref)).subscribeOn(Schedulers.elastic());
Mono<Item> item = Mono.fromCallable(() -> jdbc.queryItem(ref)).subscribeOn(Schedulers.elastic());
Mono<Warehouse> warehouse = Mono.fromCallable(() -> jdbc.queryWarehouse(ref))
.subscribeOn(Schedulers.elastic());
return Mono.zip(Mono.just(ref), product, item, warehouse); // query calls will be concurrent
}
private JSONObject createJsonObject(Tuple4<Long, Product, Item, Warehouse> tuple)
{
Long ref = tuple.getT1();
Product product = tuple.getT2();
Item item = tuple.getT3();
Warehouse warehouse = tuple.getT4();
JSONObject json = new JSONObject();
// build json object here
return json;
}
想法是首先获取 refs
的完整列表,然后同时获取产品、项目和仓库 - 我将此称为 Tuple3 lookups
。然后把每个ref
和lookups
组合起来,一一转化为JSONObject
。
return Mono.fromCallable(jdbc::queryForList) //fetches refs
.subscribeOn(Schedulers.elastic())
.flatMapMany(refList -> { //flatMapMany allows to convert Mono to Flux in flatMap operation
Flux<Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>>> lookups = Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList))
.cache().repeat(); //notice cache - it makes sure that Mono.zip is executed only once, not for each zipWith call
return Flux.fromIterable(refList)
.zipWith(lookups);
}
)
.map(t -> {
Long ref = t.getT1();
Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>> lookups = t.getT2();
JSONObject json = new JSONObject();
json.put("ref", ref);
json.put("product", lookups.getT1().get(ref));
json.put("item", lookups.getT2().get(ref));
json.put("warehouse", lookups.getT3().get(ref));
return json;
});
每个数据库调用的方法:
Mono<Map<Long, Product>> fetchProducts(List<Long> refs) {
return Mono.fromCallable(() -> jdbc.query(SELECT * from products where ref IN(:refs),params))
.subscribeOn(Schedulers.elastic());
}
Mono<Map<Long, Item>> fetchItems(List<Long> refs) {
return Mono.fromCallable(() -> jdbc.query(SELECT * from items where ref IN(:refs),params))
.subscribeOn(Schedulers.elastic());
}
Mono<Map<Long, Warehouse>> fetchWarehouses(List<Long> refs) {
return Mono.fromCallable(() -> jdbc.query(SELECT * from warehouses where ref IN(:refs),params))
.subscribeOn(Schedulers.elastic());
}
为什么我需要订阅?
我之所以这么说是因为两个原因:
允许在专用线程上执行数据库查询 线程池,防止阻塞主线程: https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
它允许真正并行化
Mono.zip
。看到这个,就是 关于flatMap
,但它也适用于zip
:
为了完整起见,在 zip 结果上使用 .flatMap()
也是可能的。虽然我不确定这里是否还需要 .cache()
。
.flatMapMany(refList -> {
Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList)).cache()
.flatMap(tuple -> Flux.fromIterable(refList).map(refId -> Tuples.of(refId, tuple)));
.map(tuple -> {
String refId = tuple.getT1();
Tuple lookups = tuple.getT2();
}
})