使用 Reactor 抛出异常的正确方法
Correct way of throwing exceptions with Reactor
我是项目 Reactor 和响应式编程的新手。
我目前正在编写一段与此类似的代码:
Mono.just(userId)
.map(repo::findById)
.map(user-> {
if(user == null){
throw new UserNotFoundException();
}
return user;
})
// ... other mappings
这个例子可能很愚蠢,肯定有更好的方法来实现这个例子,但重点是:
在 map
块中使用 throw new
异常是错误的还是我应该用 return Mono.error(new UserNotFoundException())
替换它?
这两种做法有实际区别吗?
有几种方法可以被认为是一种方便的异常抛出方法:
使用 Flux/Mono.handle
处理您的元素
可以简化可能导致错误或空流的元素处理的方法之一是运算符 handle
。
下面的代码展示了我们如何使用它来解决我们的问题:
Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})
正如我们所见,.handle
运算符需要传递 BiConsumer<T, SynchronousSink<>
才能处理元素。这里我们的 BiConsumer 中有两个参数。第一个是来自上游的元素,第二个是 SynchronousSink
这有助于我们同步向下游提供元素。这种技术扩展了提供元素处理的不同结果的能力。例如,如果元素无效,我们可以向相同的 SycnchronousSync
提供错误,这将取消上游并向下游产生 onError
信号。反过来,我们可以使用相同的 handle
运算符“过滤”。一旦句柄 BiConsumer
被执行并且没有提供任何元素,Reactor 将认为这是一种过滤并会为我们请求一个额外的元素。最后,如果元素有效,我们可以简单地调用 SynchronousSink#next
并向下游传播我们的元素或在其上应用一些映射,因此我们将 handle
作为此处的 map
运算符。此外,我们可以安全地使用该运算符而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误。
使用 #concatMap
+ Mono.error
抛出
在映射期间抛出异常的选项之一是将 map
替换为 concatMap
。从本质上讲,concatMap
的作用几乎与 flatMap
的作用相同。唯一的区别是 concatMap
一次只允许一个子流。这种行为大大简化了内部实现,并且不会影响性能。所以我们可以使用以下代码以更实用的方式抛出异常:
Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})
在上面的示例中,对于无效用户,我们使用 Mono.error
return 异常。我们可以使用 Flux.error
:
对通量做同样的事情
Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})
注意,在这两种情况下,我们 return cold 流只有一个元素。在 Reactor 中,有一些优化可以在 returned 流是冷 scalar 流的情况下提高性能。因此,建议使用 Flux/Mono concatMap
+ .just
、empty
、error
作为结果,当我们需要更复杂的映射时,结果可能是return null
或 throw new ...
.
Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a null
value for you since this violates Reactive Streams spec (see Rule 2.13) Thus, in case if repo.findById
returns null, Reactor will throw NullPointerException for you.
等等,为什么 concatMap
比 flatMap
好?
本质上,flatMap
旨在合并来自同时执行的多个子流的元素。这意味着 flatMap 应该在底层有异步流,因此它们可能会在多个线程上处理数据,或者可能是多个网络调用。随后,这样的期望对实现产生了很大影响,因此 flatMap
应该能够处理来自多个流的数据(Thread
s)(意味着使用并发数据结构),如果有另一个流,则将元素排入队列流(意味着为每个子流分配 Queue
s 的额外内存)并且不违反 Reactive Streams 规范规则(意味着非常复杂的实现)。计算所有这些事实以及我们将普通 map
操作(同步)替换为更方便的使用 Flux/Mono.error
抛出异常的方式(不会改变执行的同步性)这一事实导致事实上,我们不需要这么复杂的运算符,我们可以使用更简单的 concatMap
,它是为一次异步处理单个流而设计的,并且有一些优化来处理标量、冷流。
使用 switchIfEmpty
引发异常
因此,另一种在结果为空时抛出异常的方法是 switchIfEmpty
运算符。以下代码演示了我们如何使用该方法:
Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))
正如我们所见,在这种情况下 repo::findById
应该有 User
的 Mono
作为 return 类型。因此,如果找不到 User
实例,结果流将为空。因此,Reactor 将调用替代 Mono
,指定为 switchIfEmpty
参数。
按原样抛出异常(例如在 map
、filter
和其他类似运算符中)
它可以算作可读性较差的代码或不良做法(我自己的看法),但您可以将异常抛出为与 Project Reactor 是(例如 .map(v -> throw ...)
)。尽管这样做可能会以某种方式违反 Reactive Streams 规范(在这种情况下,从语义的角度来看违反了,因为你的操作员在引擎盖下是 Subscriber
链中的 Subscriber
s,因此——从语义上讲,在 lambda 中抛出异常可以映射为在违反 spec's rule 2.13 的 onNext
方法中抛出异常。但是,由于 Reactor 会为您捕获抛出的异常并将其作为 onError
信号传播到您的下游,因此不禁止这样做。
外卖
- 使用
.handle
运算符以提供复杂元素处理
- 当我们需要在映射过程中抛出异常时使用
concatMap
+ Mono.error
,但这种技术最适合异步元素处理的情况。
- 当我们已经有
flatMap
时使用 flatMap
+ Mono.error
Null
因为 return 类型是被禁止的,所以在你的下游 map
中你会得到意想不到的 onError
而不是 null
和 NullPointerException
- 如果调用某些特定函数的结果以 empty stream
结束,则在所有需要发送错误信号的情况下使用 switchIfEmpty
我是项目 Reactor 和响应式编程的新手。
我目前正在编写一段与此类似的代码:
Mono.just(userId)
.map(repo::findById)
.map(user-> {
if(user == null){
throw new UserNotFoundException();
}
return user;
})
// ... other mappings
这个例子可能很愚蠢,肯定有更好的方法来实现这个例子,但重点是:
在 map
块中使用 throw new
异常是错误的还是我应该用 return Mono.error(new UserNotFoundException())
替换它?
这两种做法有实际区别吗?
有几种方法可以被认为是一种方便的异常抛出方法:
使用 Flux/Mono.handle
处理您的元素
可以简化可能导致错误或空流的元素处理的方法之一是运算符 handle
。
下面的代码展示了我们如何使用它来解决我们的问题:
Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})
正如我们所见,.handle
运算符需要传递 BiConsumer<T, SynchronousSink<>
才能处理元素。这里我们的 BiConsumer 中有两个参数。第一个是来自上游的元素,第二个是 SynchronousSink
这有助于我们同步向下游提供元素。这种技术扩展了提供元素处理的不同结果的能力。例如,如果元素无效,我们可以向相同的 SycnchronousSync
提供错误,这将取消上游并向下游产生 onError
信号。反过来,我们可以使用相同的 handle
运算符“过滤”。一旦句柄 BiConsumer
被执行并且没有提供任何元素,Reactor 将认为这是一种过滤并会为我们请求一个额外的元素。最后,如果元素有效,我们可以简单地调用 SynchronousSink#next
并向下游传播我们的元素或在其上应用一些映射,因此我们将 handle
作为此处的 map
运算符。此外,我们可以安全地使用该运算符而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误。
使用 #concatMap
+ Mono.error
抛出
在映射期间抛出异常的选项之一是将 map
替换为 concatMap
。从本质上讲,concatMap
的作用几乎与 flatMap
的作用相同。唯一的区别是 concatMap
一次只允许一个子流。这种行为大大简化了内部实现,并且不会影响性能。所以我们可以使用以下代码以更实用的方式抛出异常:
Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})
在上面的示例中,对于无效用户,我们使用 Mono.error
return 异常。我们可以使用 Flux.error
:
Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})
注意,在这两种情况下,我们 return cold 流只有一个元素。在 Reactor 中,有一些优化可以在 returned 流是冷 scalar 流的情况下提高性能。因此,建议使用 Flux/Mono concatMap
+ .just
、empty
、error
作为结果,当我们需要更复杂的映射时,结果可能是return null
或 throw new ...
.
Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a
null
value for you since this violates Reactive Streams spec (see Rule 2.13) Thus, in case ifrepo.findById
returns null, Reactor will throw NullPointerException for you.
等等,为什么 concatMap
比 flatMap
好?
本质上,flatMap
旨在合并来自同时执行的多个子流的元素。这意味着 flatMap 应该在底层有异步流,因此它们可能会在多个线程上处理数据,或者可能是多个网络调用。随后,这样的期望对实现产生了很大影响,因此 flatMap
应该能够处理来自多个流的数据(Thread
s)(意味着使用并发数据结构),如果有另一个流,则将元素排入队列流(意味着为每个子流分配 Queue
s 的额外内存)并且不违反 Reactive Streams 规范规则(意味着非常复杂的实现)。计算所有这些事实以及我们将普通 map
操作(同步)替换为更方便的使用 Flux/Mono.error
抛出异常的方式(不会改变执行的同步性)这一事实导致事实上,我们不需要这么复杂的运算符,我们可以使用更简单的 concatMap
,它是为一次异步处理单个流而设计的,并且有一些优化来处理标量、冷流。
使用 switchIfEmpty
引发异常
因此,另一种在结果为空时抛出异常的方法是 switchIfEmpty
运算符。以下代码演示了我们如何使用该方法:
Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))
正如我们所见,在这种情况下 repo::findById
应该有 User
的 Mono
作为 return 类型。因此,如果找不到 User
实例,结果流将为空。因此,Reactor 将调用替代 Mono
,指定为 switchIfEmpty
参数。
按原样抛出异常(例如在 map
、filter
和其他类似运算符中)
它可以算作可读性较差的代码或不良做法(我自己的看法),但您可以将异常抛出为与 Project Reactor 是(例如 .map(v -> throw ...)
)。尽管这样做可能会以某种方式违反 Reactive Streams 规范(在这种情况下,从语义的角度来看违反了,因为你的操作员在引擎盖下是 Subscriber
链中的 Subscriber
s,因此——从语义上讲,在 lambda 中抛出异常可以映射为在违反 spec's rule 2.13 的 onNext
方法中抛出异常。但是,由于 Reactor 会为您捕获抛出的异常并将其作为 onError
信号传播到您的下游,因此不禁止这样做。
外卖
- 使用
.handle
运算符以提供复杂元素处理 - 当我们需要在映射过程中抛出异常时使用
concatMap
+Mono.error
,但这种技术最适合异步元素处理的情况。 - 当我们已经有
flatMap
时使用flatMap
+Mono.error
Null
因为 return 类型是被禁止的,所以在你的下游map
中你会得到意想不到的onError
而不是null
和NullPointerException
- 如果调用某些特定函数的结果以 empty stream 结束,则在所有需要发送错误信号的情况下使用
switchIfEmpty