将 Mongo 集合中的几个 Flux 合并为一个
Merging several Flux into one from Mongo Collection
我对 spring 响应式编程有点陌生。我目前正在使用 spring-boot-starter-webflux:2.0.0.M5.
开发应用程序
我有一个 Foodtruck mongodb 模型,其中包含菜单:
@Document(collection = "Foodtruck")
@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
public class Foodtruck {
@Id
private String id;
@URL
private String siteUrl;
@NotBlank
private String name;
private String description;
@NotBlank
private String address;
private List<Menu> menus;
@JsonIgnore
private GridFS image;
}
这是菜单的模型:
@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
@Document(collection = "menus")
public class Menu {
@Id
private String id;
private List<DayOfWeek> days;
@NotBlank
private String label;
private String description;
private Double price;
private List<Dish> dishes;
@JsonIgnore
private GridFS image;
}
要获取我所有的菜单,我首先需要获取我所有的餐厅,然后合并所有获得的 Flux,并通过我的 rest controller return 它们。
这是我从我的休息资源调用的服务:
@Service
public class MenuServiceImpl implements MenuService {
FoodtruckService foodtruckService;
public MenuServiceImpl(FoodtruckService foodtruckService) {
this.foodtruckService = foodtruckService;
}
@Override
public Flux<Menu> getAllMenus() {
Flux<Menu> allMenuFlux = Flux.empty();
Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();
foodtruckFlux.toStream().forEach(foodtruck -> {
Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
allMenuFlux.mergeWith(currentMenuFlux);
});
return allMenuFlux;
}
}
mergeWith 似乎没有向 allMenuFlux 添加任何内容。我想我在这里有一个理解问题。
我已经阅读了文档,测试了其他方法,如 concat 或 zip,但它没有像我希望的那样交错通量事件。我无法正确合并这些 Flux。
我还认为有一种更好的方法可以通过 menu 存储库获取 mongo 嵌入式文档,因为我的方法似乎有点矫枉过正,并且在很长一段时间内会导致性能问题运行。我试过了,还是不行。
编辑:
尝试以下代码后(确保我的列表不为空),生成的 fluxtest3 变量被正确合并:
Flux<Menu> allMenuFlux = Flux.empty();
Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();
List<Foodtruck> test = foodtruckFlux.collectList().block();
Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);
但这并不是我想要的。为什么它不能使用空助焊剂作为父助焊剂。
我在这里错过了什么?
在此先感谢您的帮助。
我认为这里存在一些误解。
如果您曾经将 Flux
转换为 Collection
、Stream
或类似的,然后从中获得的东西返回到 Flux
,您肯定是做错了什么。这些 类 强制您的管道收集多个元素,然后处理它们,然后将它们转换回 Flux
。在几乎所有情况下,这应该仅通过 Flux
.
提供的操作就可以实现
Flux
上的方法不会更改 Flux
,但会创建具有额外行为的新实例。所以如果你有这样的代码:
Flux<Menu> allMenuFlux = Flux.empty();
Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();
foodtruckFlux.toStream().forEach(foodtruck -> {
Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
allMenuFlux.mergeWith(currentMenuFlux);
});
return allMenuFlux;
每次调用 allMenuFlux.mergeWith(currentMenuFlux);
时都会创建一个新的 Flux
,以便垃圾收集器可以处理它。 allMenuFlux
仍然是您开始时使用的空 `Flux。
你真正想要的似乎是:
return foodtruckService.getAllFoodtrucks()
.flatMap(Foodtruck::getMenus);
请参阅 flatMap
的文档。 flatMap
和 mergeWith
的区别在于 mergeWith
保留了原始元素。如果在您的用例中有 none,这是多余的。
奖金:您的附加问题
Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);
这里您不是 return 原始 fluxtest1
而是新生成的 fluxtest2
。因此它确实有效。
我对 spring 响应式编程有点陌生。我目前正在使用 spring-boot-starter-webflux:2.0.0.M5.
开发应用程序我有一个 Foodtruck mongodb 模型,其中包含菜单:
@Document(collection = "Foodtruck")
@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
public class Foodtruck {
@Id
private String id;
@URL
private String siteUrl;
@NotBlank
private String name;
private String description;
@NotBlank
private String address;
private List<Menu> menus;
@JsonIgnore
private GridFS image;
}
这是菜单的模型:
@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
@Document(collection = "menus")
public class Menu {
@Id
private String id;
private List<DayOfWeek> days;
@NotBlank
private String label;
private String description;
private Double price;
private List<Dish> dishes;
@JsonIgnore
private GridFS image;
}
要获取我所有的菜单,我首先需要获取我所有的餐厅,然后合并所有获得的 Flux,并通过我的 rest controller return 它们。
这是我从我的休息资源调用的服务:
@Service
public class MenuServiceImpl implements MenuService {
FoodtruckService foodtruckService;
public MenuServiceImpl(FoodtruckService foodtruckService) {
this.foodtruckService = foodtruckService;
}
@Override
public Flux<Menu> getAllMenus() {
Flux<Menu> allMenuFlux = Flux.empty();
Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();
foodtruckFlux.toStream().forEach(foodtruck -> {
Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
allMenuFlux.mergeWith(currentMenuFlux);
});
return allMenuFlux;
}
}
mergeWith 似乎没有向 allMenuFlux 添加任何内容。我想我在这里有一个理解问题。
我已经阅读了文档,测试了其他方法,如 concat 或 zip,但它没有像我希望的那样交错通量事件。我无法正确合并这些 Flux。 我还认为有一种更好的方法可以通过 menu 存储库获取 mongo 嵌入式文档,因为我的方法似乎有点矫枉过正,并且在很长一段时间内会导致性能问题运行。我试过了,还是不行。
编辑: 尝试以下代码后(确保我的列表不为空),生成的 fluxtest3 变量被正确合并:
Flux<Menu> allMenuFlux = Flux.empty();
Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();
List<Foodtruck> test = foodtruckFlux.collectList().block();
Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);
但这并不是我想要的。为什么它不能使用空助焊剂作为父助焊剂。
我在这里错过了什么?
在此先感谢您的帮助。
我认为这里存在一些误解。
如果您曾经将
Flux
转换为Collection
、Stream
或类似的,然后从中获得的东西返回到Flux
,您肯定是做错了什么。这些 类 强制您的管道收集多个元素,然后处理它们,然后将它们转换回Flux
。在几乎所有情况下,这应该仅通过Flux
. 提供的操作就可以实现
Flux
上的方法不会更改Flux
,但会创建具有额外行为的新实例。所以如果你有这样的代码:Flux<Menu> allMenuFlux = Flux.empty(); Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks(); foodtruckFlux.toStream().forEach(foodtruck -> { Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus()); allMenuFlux.mergeWith(currentMenuFlux); });
return allMenuFlux;
每次调用
allMenuFlux.mergeWith(currentMenuFlux);
时都会创建一个新的Flux
,以便垃圾收集器可以处理它。allMenuFlux
仍然是您开始时使用的空 `Flux。你真正想要的似乎是:
return foodtruckService.getAllFoodtrucks() .flatMap(Foodtruck::getMenus);
请参阅
flatMap
的文档。flatMap
和mergeWith
的区别在于mergeWith
保留了原始元素。如果在您的用例中有 none,这是多余的。
奖金:您的附加问题
Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);
这里您不是 return 原始 fluxtest1
而是新生成的 fluxtest2
。因此它确实有效。