Flux处理时如何正确抛出异常?

How to throw an exception properly when do Flux processing?

我现有的代码:

private Flux<Integer> testGetFluxTestData() {
    return Flux.just(new TestData(1), new TestData(2))
            .collectList()
            .map(list -> list.stream()
                    .map(TestData::getId)
                    .collect(Collectors.toList()))
            .flatMapMany(Flux::fromIterable);
}

我想丰富现有代码并在收到一些不允许的数据时抛出异常,我做了以下更改:

    private Flux<Integer> testGetFluxTestData2() {
        return Flux.just(new TestData(1), new TestData(2))
                .collectList()
                .map(list -> {
                    return !list.contains(new TestData(1)) ?
                            list.stream()
                                    .map(TestData::getId)
                                    .collect(Collectors.toList()) :
                            Flux.error(new IllegalTestDataException("illegal test data 1"));
                })
                .flatMapMany(Flux::fromIterable);
    }

但由于以下行,我的实现甚至无法编译:

Flux.error(new IllegalTestDataException("illegal test data 1"));

请问如何针对我的特定场景处理异常抛出?

您正在尝试将 mapList<TestData> 转换为 List<Integer>Flux<?>(错误),这使得所需的结果类型不明确。通常不需要在映射函数中返回反应类型(您希望在平面映射函数中这样做)。

(旁注:即使你在 flatMap 中,它也不会起作用,因为那时你在 Mono API 中,因为 collectList, 所以 Mono.flatMap 期望 Mono 结果到 Function).

请注意,map 运算符从 lambda 捕获异常并将它们转换为 onError 信号,因此从技术上讲,您可以将 Flux.error 替换为 throw

否则,出于上述原因,您需要将 map 变为 flatMap,并将 Flux.error 变为 Mono.error