使用 Reactor 抛出异常的正确方法

Correct way of throwing exceptions with Reactor

我是项目 Reactor 和响应式编程的新手。

我目前正在编写一段与此类似的代码:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings

这个例子可能很愚蠢,肯定有更好的方法来实现这个例子,但重点是:

map 块中使用 throw new 异常是错误的还是我应该用 return Mono.error(new UserNotFoundException()) 替换它?

这两种做法有实际区别吗?

有几种方法可以被认为是一种方便的异常抛出方法:

使用 Flux/Mono.handle

处理您的元素

可以简化可能导致错误或空流的元素处理的方法之一是运算符 handle

下面的代码展示了我们如何使用它来解决我们的问题:

Mono.just(userId)
    .map(repo::findById)
    .handle((user, sink) -> {
        if(!isValid(user)){
            sink.error(new InvalidUserException());
        } else if (isSendable(user))
            sink.next(user);
        }
        else {
            //just ignore element
        }
    })

正如我们所见,.handle 运算符需要传递 BiConsumer<T, SynchronousSink<> 才能处理元素。这里我们的 BiConsumer 中有两个参数。第一个是来自上游的元素,第二个是 SynchronousSink 这有助于我们同步向下游提供元素。这种技术扩展了提供元素处理的不同结果的能力。例如,如果元素无效,我们可以向相同的 SycnchronousSync 提供错误,这将取消上游并向下游产生 onError 信号。反过来,我们可以使用相同的 handle 运算符“过滤”。一旦句柄 BiConsumer 被执行并且没有提供任何元素,Reactor 将认为这是一种过滤并会为我们请求一个额外的元素。最后,如果元素有效,我们可以简单地调用 SynchronousSink#next 并向下游传播我们的元素或在其上应用一些映射,因此我们将 handle 作为此处的 map 运算符。此外,我们可以安全地使用该运算符而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误。

使用 #concatMap + Mono.error

抛出

在映射期间抛出异常的选项之一是将 map 替换为 concatMap。从本质上讲,concatMap 的作用几乎与 flatMap 的作用相同。唯一的区别是 concatMap 一次只允许一个子流。这种行为大大简化了内部实现,并且不会影响性能。所以我们可以使用以下代码以更实用的方式抛出异常:

Mono.just(userId)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Mono.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

在上面的示例中,对于无效用户,我们使用 Mono.error return 异常。我们可以使用 Flux.error:

对通量做同样的事情
Flux.just(userId1, userId2, userId3)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Flux.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

注意,在这两种情况下,我们 return cold 流只有一个元素。在 Reactor 中,有一些优化可以在 returned 流是冷 scalar 流的情况下提高性能。因此,建议使用 Flux/Mono concatMap + .justemptyerror 作为结果,当我们需要更复杂的映射时,结果可能是return nullthrow new ....

Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a null value for you since this violates Reactive Streams spec (see Rule 2.13) Thus, in case if repo.findById returns null, Reactor will throw NullPointerException for you.

等等,为什么 concatMapflatMap 好?

本质上,flatMap 旨在合并来自同时执行的多个子流的元素。这意味着 flatMap 应该在底层有异步流,因此它们可能会在多个线程上处理数据,或者可能是多个网络调用。随后,这样的期望对实现产生了很大影响,因此 flatMap 应该能够处理来自多个流的数据(Threads)(意味着使用并发数据结构),如果有另一个流,则将元素排入队列流(意味着为每个子流分配 Queues 的额外内存)并且不违反 Reactive Streams 规范规则(意味着非常复杂的实现)。计算所有这些事实以及我们将普通 map 操作(同步)替换为更方便的使用 Flux/Mono.error 抛出异常的方式(不会改变执行的同步性)这一事实导致事实上,我们不需要这么复杂的运算符,我们可以使用更简单的 concatMap,它是为一次异步处理单个流而设计的,并且有一些优化来处理标量、冷流。

使用 switchIfEmpty

引发异常

因此,另一种在结果为空时抛出异常的方法是 switchIfEmpty 运算符。以下代码演示了我们如何使用该方法:

Mono.just(userId)
    .flatMap(repo::findById)
    .switchIfEmpty(Mono.error(new UserNotFoundExeception()))

正如我们所见,在这种情况下 repo::findById 应该有 UserMono 作为 return 类型。因此,如果找不到 User 实例,结果流将为空。因此,Reactor 将调用替代 Mono,指定为 switchIfEmpty 参数。

按原样抛出异常(例如在 mapfilter 和其他类似运算符中)

它可以算作可读性较差的代码或不良做法(我自己的看法),但您可以将异常抛出为与 Project Reactor 是(例如 .map(v -> throw ...))。尽管这样做可能会以某种方式违反 Reactive Streams 规范(在这种情况下,从语义的角度来看违反了,因为你的操作员在引擎盖下是 Subscriber 链中的 Subscribers,因此——从语义上讲,在 lambda 中抛出异常可以映射为在违反 spec's rule 2.13onNext 方法中抛出异常。但是,由于 Reactor 会为您捕获抛出的异常并将其作为 onError 信号传播到您的下游,因此不禁止这样做。

外卖

  1. 使用.handle运算符以提供复杂元素处理
  2. 当我们需要在映射过程中抛出异常时使用concatMap+ Mono.error,但这种技术最适合异步元素处理的情况。
  3. 当我们已经有 flatMap 时使用 flatMap + Mono.error
  4. Null 因为 return 类型是被禁止的,所以在你的下游 map 中你会得到意想不到的 onError 而不是 nullNullPointerException
  5. 如果调用某些特定函数的结果以 empty stream
  6. 结束,则在所有需要发送错误信号的情况下使用 switchIfEmpty