如何对 Flux 中的多个值求和?

How to sum multiple values in a Flux?

我有费用数据。我需要一个包含总费用金额、总支付费用金额和费用清单的模型。为此,我写了一个名为 createExpenseSummaryReport.

的函数
List<ExpenseModel> expensesModel = Arrays.asList(
        new ExpenseModel("Expense1", 121.687, 31.89),
        new ExpenseModel("Expense2", 78.03, 72.5),
        new ExpenseModel("Expense3", 102.014, 102.014),
        new ExpenseModel("Expense4", 100.0, 45.42)
    );
 
....
 public static ExpenseReportSummary createExpenseSummaryReport(List<ExpenseModel> expensesModel) {
    BigDecimal totalExpense = BigDecimal.ZERO;
    BigDecimal totalPaidExpense = BigDecimal.ZERO;
    for (ExpenseModel expense : expensesModel) {
      totalExpense = totalExpense.add(BigDecimal.valueOf(expense.getAmount()));
      totalPaidExpense =totalPaidExpense.add(BigDecimal.valueOf(expense.getPaidAmount()));
    }
    return new ExpenseReportSummary(expensesModel, totalExpense, totalPaidExpense);
  }

在实际项目中,我有一个名为 IncomeExpenseRepository 的存储库。这个存储库有这个方法 returns Flux 收入支出。

Flux<IncomeExpense> byDateContaining = incomeExpenseRepository.findAllByDateContaining(
        request.getPeriod());

我需要一个 createExpenseSummaryReport 响应式服务。 createExpenseSummaryReport包括收支表、总收入金额、总付费元素。为此,我正在尝试完成以下功能。

@Override
public Mono<IncomeExpenseSummaryReport> getIncomeExpenseReportByPeriod(
GetIncomeExpenseReportRequest request) {

    AtomicReference<BigDecimal> totalExpenseAmount = new AtomicReference<>(BigDecimal.ZERO);
    AtomicReference<BigDecimal> totalPaidExpenseAmount = new AtomicReference<>(BigDecimal.ZERO);
    
    incomeExpenseRepository.findAllByDateContaining(
            request.getPeriod())
        .doOnEach(d -> {
          IncomeExpense incomeExpense = d.get();
          totalExpenseAmount.getAndSet(BigDecimal.valueOf(incomeExpense.getAmount()));
          if ("paid".equals(incomeExpense.getPaymentStatus())) {
            totalPaidExpenseAmount.getAndSet(BigDecimal.valueOf(incomeExpense.getAmount()));
          }
        });


    return Flux.empty();

  }

预期的方法是实际使用 Flux#reduce 运算符:

Flux.just(new ExpenseModel("Expense1", 121.687, 31.89),
        new ExpenseModel("Expense2", 78.03, 72.5),
        new ExpenseModel("Expense3", 102.014, 102.014),
        new ExpenseModel("Expense4", 100.0, 45.42))
    .reduce(new ExpenseReportSummary(BigDecimal.ZERO, BigDecimal.ZERO), ((expenseReportSummary, expenseModel) -> {
        expenseReportSummary.setTotalExpense(
            expenseReportSummary.getTotalExpense().add(BigDecimal.valueOf(expenseModel.getAmount())));
        expenseReportSummary.setTotalPaidExpense(
            expenseReportSummary.getTotalPaidExpense().add(BigDecimal.valueOf(expenseModel.getPaidAmount())));
        return expenseReportSummary;
    }))
    .doOnNext(System.out::println)

结果是 ExpenseReportSummaryMono

输出:

ExpenseReportSummary(totalExpense=401.731, totalPaidExpense=251.824)