将 Mongo 集合中的几个 Flux 合并为一个

Merging several Flux into one from Mongo Collection

我对 spring 响应式编程有点陌生。我目前正在使用 spring-boot-starter-webflux:2.0.0.M5.

开发应用程序

我有一个 Foodtruck mongodb 模型,其中包含菜单:

@Document(collection = "Foodtruck")
@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
public class Foodtruck {
  @Id
  private String id;

  @URL
  private String siteUrl;

  @NotBlank
  private String name;

  private String description;

  @NotBlank
  private String address;

  private List<Menu> menus;

  @JsonIgnore
  private GridFS image;
}

这是菜单的模型:

@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
@Document(collection = "menus")
public class Menu {

  @Id
  private String id;

  private List<DayOfWeek> days;

  @NotBlank
  private String label;

  private String description;

  private Double price;

  private List<Dish> dishes;

  @JsonIgnore
  private GridFS image;
}

要获取我所有的菜单,我首先需要获取我所有的餐厅,然后合并所有获得的 Flux,并通过我的 rest controller return 它们。

这是我从我的休息资源调用的服务:

@Service
public class MenuServiceImpl implements MenuService {

  FoodtruckService foodtruckService;

  public MenuServiceImpl(FoodtruckService foodtruckService) {
    this.foodtruckService = foodtruckService;
  }

  @Override
  public Flux<Menu> getAllMenus() {
    Flux<Menu> allMenuFlux = Flux.empty();
    Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();

    foodtruckFlux.toStream().forEach(foodtruck -> {
      Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
      allMenuFlux.mergeWith(currentMenuFlux);
    });

    return allMenuFlux;
  }
}

mergeWith 似乎没有向 allMenuFlux 添加任何内容。我想我在这里有一个理解问题。

我已经阅读了文档,测试了其他方法,如 concat 或 zip,但它没有像我希望的那样交错通量事件。我无法正确合并这些 Flux。 我还认为有一种更好的方法可以通过 menu 存储库获取 mongo 嵌入式文档,因为我的方法似乎有点矫枉过正,并且在很长一段时间内会导致性能问题运行。我试过了,还是不行。

编辑: 尝试以下代码后(确保我的列表不为空),生成的 fluxtest3 变量被正确合并:

Flux<Menu> allMenuFlux = Flux.empty();
Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();

List<Foodtruck> test = foodtruckFlux.collectList().block();

Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);

但这并不是我想要的。为什么它不能使用空助焊剂作为父助焊剂。

我在这里错过了什么?

在此先感谢您的帮助。

我认为这里存在一些误解。

  1. 如果您曾经将 Flux 转换为 CollectionStream 或类似的,然后从中获得的东西返回到 Flux,您肯定是做错了什么。这些 类 强制您的管道收集多个元素,然后处理它们,然后将它们转换回 Flux。在几乎所有情况下,这应该仅通过 Flux.

  2. 提供的操作就可以实现
  3. Flux 上的方法不会更改 Flux,但会创建具有额外行为的新实例。所以如果你有这样的代码:

    Flux<Menu> allMenuFlux = Flux.empty();
    Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();
    
    foodtruckFlux.toStream().forEach(foodtruck -> {
        Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
        allMenuFlux.mergeWith(currentMenuFlux);
    });
    

    return allMenuFlux;

    每次调用 allMenuFlux.mergeWith(currentMenuFlux); 时都会创建一个新的 Flux,以便垃圾收集器可以处理它。 allMenuFlux 仍然是您开始时使用的空 `Flux。

  4. 你真正想要的似乎是:

    return foodtruckService.getAllFoodtrucks()
        .flatMap(Foodtruck::getMenus);
    

    请参阅 flatMap 的文档。 flatMapmergeWith 的区别在于 mergeWith 保留了原始元素。如果在您的用例中有 none,这是多余的。

奖金:您的附加问题

Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);

这里您不是 return 原始 fluxtest1 而是新生成的 fluxtest2。因此它确实有效。