反应堆从多个反应方法中抛出常见错误的正确方法

Reactor proper way of throwing a common error from multiple reactive methods

我有一些响应式方法,当流为空时我想从中抛出一个常见异常,例如:

import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
public class Test {
    private final Mono<Integer> errorMono = Mono.error(() -> new Exception("Empty"));

    @org.junit.Test
    public void testErrors() {
        function1(null)
                .as(StepVerifier::create)
                .consumeErrorWith(Throwable::printStackTrace)
                .verify();
        function2(null)
                .as(StepVerifier::create)
                .consumeErrorWith(Throwable::printStackTrace)
                .verify();
        function3(null)
                .as(StepVerifier::create)
                .consumeErrorWith(Throwable::printStackTrace)
                .verify();
    }

    private Mono<Integer> function1(Integer input) {
        return Mono.justOrEmpty(input)
                .doOnNext(i -> System.out.println("Function 1 " + i))
                .switchIfEmpty(errorMono);
    }
    private Mono<Integer> function2(Integer input) {
        return Mono.justOrEmpty(input)
                .doOnNext(i -> System.out.println("Function 2 " + i))
                .switchIfEmpty(errorMono);
    }
    private Mono<Integer> function3(Integer input) {
        return Mono.justOrEmpty(input)
                .doOnNext(i -> System.out.println("Function 3 " + i))
                .switchIfEmpty(errorMono);
    }
}

对于这些方法中的每一个,异常堆栈跟踪都源自第 1 行,我无法找出哪些方法是空的:

private final Mono<Integer> errorMono = Mono.error(() -> new Exception("Empty"));

java.lang.Exception: Empty
        at Test.lambda$new[=11=](Test.java:10)
        at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4252)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.Operators.complete(Operators.java:135)
        at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4252)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:868)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:824)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:816)
        at Test.testErrors(Test.java:17)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:59)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:306)
        at org.junit.runners.BlockJUnit4ClassRunner.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access0(ParentRunner.java:66)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
    java.lang.Exception: Empty
        at Test.lambda$new[=11=](Test.java:10)
        at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4252)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.Operators.complete(Operators.java:135)
        at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4252)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:868)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:824)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:816)
        at Test.testErrors(Test.java:21)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:59)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:306)
        at org.junit.runners.BlockJUnit4ClassRunner.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access0(ParentRunner.java:66)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
    java.lang.Exception: Empty
        at Test.lambda$new[=11=](Test.java:10)
        at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4252)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.Operators.complete(Operators.java:135)
        at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4252)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:868)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:824)
        at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:816)
        at Test.testErrors(Test.java:25)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:59)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:306)
        at org.junit.runners.BlockJUnit4ClassRunner.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access0(ParentRunner.java:66)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)

我如何编写一个通用的异常抛出机制来在堆栈跟踪中提供正确的功能?

您可以为每个函数创建一个特定的 Mono.error,而不是依赖单个 Mono<Integer> errorMono = Mono.error(() -> new Exception("Empty"));。然后你在这些 Mono.error 上写下具体消息,如下所示:

private Mono<Integer> function1(Integer input) {
        return Mono.justOrEmpty(input)
                .doOnNext(i -> System.out.println("Function 1 " + i))
                .switchIfEmpty(Mono.error(() -> new Exception("function 1")));
    }

    private Mono<Integer> function2(Integer input) {
        return Mono.justOrEmpty(input)
                .doOnNext(i -> System.out.println("Function 2 " + i))
                .switchIfEmpty(Mono.error(() -> new Exception("function 2")));
    }

    private Mono<Integer> function3(Integer input) {
        return Mono.justOrEmpty(input)
                .doOnNext(i -> System.out.println("Function 3 " + i))
                .switchIfEmpty(Mono.error(() -> new Exception("function 3")));
    }

然后您将能够在堆栈跟踪中看到哪个函数引发了您的错误:

java.lang.Exception: function 1
    at com.github.felipegutierrez.explore.advance.CustomExceptionTest.lambda$function1(CustomExceptionTest.java:30)
...
java.lang.Exception: function 2
    at com.github.felipegutierrez.explore.advance.CustomExceptionTest.lambda$function2(CustomExceptionTest.java:36)
...
java.lang.Exception: function 3
    at com.github.felipegutierrez.explore.advance.CustomExceptionTest.lambda$function3(CustomExceptionTest.java:42)