从 Micronaut 中的 Listener 方法抛出 RabbitListenerException
Throw RabbitListenerException from the Listener method in Micronaut
我在 Micronaut 中有一个消费者异常处理程序,如下所示
@Singleton
@Primary
@Replaces(DefaultRabbitListenerExceptionHandler.class)
public class RabbitListenerCustomExceptionHandler implements RabbitListenerExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(RabbitListenerCustomExceptionHandler.class);
@Override
public void handle(RabbitListenerException exception) {
if (LOG.isErrorEnabled()) {
Optional<RabbitConsumerState> messageState = exception.getMessageState();
if (messageState.isPresent()) {
LOG.error("Error processing a message for RabbitMQ consumer [" + exception.getListener() + "]", exception);
} else {
LOG.error("RabbitMQ consumer [" + exception.getListener() + "] produced an error", exception);
}
}
}
}
从侦听器方法,我想抛出异常然后在上面的方法中捕获。
@RabbitListener
public class SubCategoryListener {
@Queue(ConstantValues.ADD_SUB_CATEGORY)
public void Create(SubCategoryViewModel model) {
LOG.info(String.format("Listener --> Adding the sub category collection"));
Category category = new Category();
category.setSubCategory(List.of(new CategorySubCategory(model.name(), model.description())));
SubCategorySearchCriteria criteria = new SubCategorySearchCriteria();
Document document = UpdateQueryBuilder.QueryBuilder(model, Document.class);
Bson queryId = QueryBuilder.QueryBuilder(criteria, Bson.class).get(0);
Single.fromPublisher(
this.repository.getCollection(ConstantValues.PRODUCT_CATEGORY_COLLECTION_NAME, Category.class)
.updateOne(queryId, document))
.subscribe(item -> {}, error ->{
throw new RabbitListenerException(error, null, null);
});
}
}
Micronaut 库中的 RabbitListenerException 包含以下参数
//Micronaut rabbit library
public RabbitListenerException(Throwable cause, Object listener, @Nullable RabbitConsumerState messageState) {
super(cause.getMessage(), cause);
this.listener = listener;
this.messageState = messageState;
}
我不确定如何从我的侦听器创建方法传递 listener 和 messageState。
更新 1
如果我没有抛出异常,则以下语句会产生 RXJava 异常 (io.reactivex.exceptions)
Single.fromPublisher(
this.repository.getCollection(ConstantValues.PRODUCT_CATEGORY_COLLECTION_NAME, Category.class)
.updateOne(queryId, document))
.subscribe(item -> {});
异常
Caused by: com.mongodb.MongoWriteException: Modifiers operate on fields but we found type array instead. For example: {$mod: {<field>: ...}} not {$push: [ { subCategory.name: "This is name update", subCategory.description: "This is update" } ]}
at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeSingleWriteRequest(AsyncMongoCollectionImpl.java:1075)
... 48 more
Exception in thread "Thread-1" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | com.mongodb.MongoWriteException: Modifiers operate on fields but we found type array instead. For example: {$mod: {<field>: ...}} not {$push: [ { subCategory.name: "This is name update", subCategory.description: "This is update" } ]}
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:46)
at io.reactivex.internal.operators.single.SingleFromPublisher$ToSingleObserver.onError(SingleFromPublisher.java:87)
at com.mongodb.reactivestreams.client.internal.AbstractSubscription.onError(AbstractSubscription.java:142)
at com.mongodb.reactivestreams.client.internal.SingleResultCallbackSubscription.lambda$requestInitialData[=14=](SingleResultCallbackSubscription.java:41)
at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeUpdate(AsyncMongoCollectionImpl.java:711)
at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeSingleWriteRequest(AsyncMongoCollectionImpl.java:1080)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.async.client.OperationExecutorImpl.onResult(OperationExecutorImpl.java:135)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.operation.OperationHelper$ConnectionReleasingWrappedCallback.onResult(OperationHelper.java:551)
at com.mongodb.internal.operation.MixedBulkWriteOperation.addBatchResult(MixedBulkWriteOperation.java:524)
at com.mongodb.internal.operation.MixedBulkWriteOperation.access00(MixedBulkWriteOperation.java:76)
at com.mongodb.internal.operation.MixedBulkWriteOperation.onResult(MixedBulkWriteOperation.java:506)
at com.mongodb.internal.operation.MixedBulkWriteOperation.onResult(MixedBulkWriteOperation.java:476)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.onResult(DefaultServer.java:288)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.CommandProtocolImpl.onResult(CommandProtocolImpl.java:84)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.onResult(DefaultConnectionPool.java:530)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.onResult(UsageTrackingInternalConnection.java:142)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.InternalStreamConnection.onResult(InternalStreamConnection.java:463)
at com.mongodb.internal.connection.InternalStreamConnection.onResult(InternalStreamConnection.java:440)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:745)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:712)
at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:582)
at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:579)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:568)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)
at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:118)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:107)
at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:579)
at com.mongodb.internal.connection.InternalStreamConnection.access00(InternalStreamConnection.java:78)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:702)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:687)
at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:582)
at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:579)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:443)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:193)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:215)
at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl.run(AsynchronousChannelGroupImpl.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: com.mongodb.MongoWriteException: Modifiers operate on fields but we found type array instead. For example: {$mod: {<field>: ...}} not {$push: [ { subCategory.name: "This is name update", subCategory.description: "This is update" } ]}
at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeSingleWriteRequest(AsyncMongoCollectionImpl.java:1075)
... 48 more
上面的异常没有被RabbitListenerCustomExceptionHandler
捕获
你根本不应该抛出那个异常。您抛出的任何异常都将为您包装在 RabbitListenerException 中。
我在 Micronaut 中有一个消费者异常处理程序,如下所示
@Singleton
@Primary
@Replaces(DefaultRabbitListenerExceptionHandler.class)
public class RabbitListenerCustomExceptionHandler implements RabbitListenerExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(RabbitListenerCustomExceptionHandler.class);
@Override
public void handle(RabbitListenerException exception) {
if (LOG.isErrorEnabled()) {
Optional<RabbitConsumerState> messageState = exception.getMessageState();
if (messageState.isPresent()) {
LOG.error("Error processing a message for RabbitMQ consumer [" + exception.getListener() + "]", exception);
} else {
LOG.error("RabbitMQ consumer [" + exception.getListener() + "] produced an error", exception);
}
}
}
}
从侦听器方法,我想抛出异常然后在上面的方法中捕获。
@RabbitListener
public class SubCategoryListener {
@Queue(ConstantValues.ADD_SUB_CATEGORY)
public void Create(SubCategoryViewModel model) {
LOG.info(String.format("Listener --> Adding the sub category collection"));
Category category = new Category();
category.setSubCategory(List.of(new CategorySubCategory(model.name(), model.description())));
SubCategorySearchCriteria criteria = new SubCategorySearchCriteria();
Document document = UpdateQueryBuilder.QueryBuilder(model, Document.class);
Bson queryId = QueryBuilder.QueryBuilder(criteria, Bson.class).get(0);
Single.fromPublisher(
this.repository.getCollection(ConstantValues.PRODUCT_CATEGORY_COLLECTION_NAME, Category.class)
.updateOne(queryId, document))
.subscribe(item -> {}, error ->{
throw new RabbitListenerException(error, null, null);
});
}
}
Micronaut 库中的 RabbitListenerException 包含以下参数
//Micronaut rabbit library
public RabbitListenerException(Throwable cause, Object listener, @Nullable RabbitConsumerState messageState) {
super(cause.getMessage(), cause);
this.listener = listener;
this.messageState = messageState;
}
我不确定如何从我的侦听器创建方法传递 listener 和 messageState。
更新 1
如果我没有抛出异常,则以下语句会产生 RXJava 异常 (io.reactivex.exceptions)
Single.fromPublisher(
this.repository.getCollection(ConstantValues.PRODUCT_CATEGORY_COLLECTION_NAME, Category.class)
.updateOne(queryId, document))
.subscribe(item -> {});
异常
Caused by: com.mongodb.MongoWriteException: Modifiers operate on fields but we found type array instead. For example: {$mod: {<field>: ...}} not {$push: [ { subCategory.name: "This is name update", subCategory.description: "This is update" } ]}
at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeSingleWriteRequest(AsyncMongoCollectionImpl.java:1075)
... 48 more
Exception in thread "Thread-1" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | com.mongodb.MongoWriteException: Modifiers operate on fields but we found type array instead. For example: {$mod: {<field>: ...}} not {$push: [ { subCategory.name: "This is name update", subCategory.description: "This is update" } ]}
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:46)
at io.reactivex.internal.operators.single.SingleFromPublisher$ToSingleObserver.onError(SingleFromPublisher.java:87)
at com.mongodb.reactivestreams.client.internal.AbstractSubscription.onError(AbstractSubscription.java:142)
at com.mongodb.reactivestreams.client.internal.SingleResultCallbackSubscription.lambda$requestInitialData[=14=](SingleResultCallbackSubscription.java:41)
at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeUpdate(AsyncMongoCollectionImpl.java:711)
at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeSingleWriteRequest(AsyncMongoCollectionImpl.java:1080)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.async.client.OperationExecutorImpl.onResult(OperationExecutorImpl.java:135)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.operation.OperationHelper$ConnectionReleasingWrappedCallback.onResult(OperationHelper.java:551)
at com.mongodb.internal.operation.MixedBulkWriteOperation.addBatchResult(MixedBulkWriteOperation.java:524)
at com.mongodb.internal.operation.MixedBulkWriteOperation.access00(MixedBulkWriteOperation.java:76)
at com.mongodb.internal.operation.MixedBulkWriteOperation.onResult(MixedBulkWriteOperation.java:506)
at com.mongodb.internal.operation.MixedBulkWriteOperation.onResult(MixedBulkWriteOperation.java:476)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.onResult(DefaultServer.java:288)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.CommandProtocolImpl.onResult(CommandProtocolImpl.java:84)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.onResult(DefaultConnectionPool.java:530)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.onResult(UsageTrackingInternalConnection.java:142)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.InternalStreamConnection.onResult(InternalStreamConnection.java:463)
at com.mongodb.internal.connection.InternalStreamConnection.onResult(InternalStreamConnection.java:440)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:745)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:712)
at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:582)
at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:579)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:568)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)
at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:118)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:107)
at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:579)
at com.mongodb.internal.connection.InternalStreamConnection.access00(InternalStreamConnection.java:78)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:702)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:687)
at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:582)
at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:579)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:443)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:193)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:215)
at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl.run(AsynchronousChannelGroupImpl.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: com.mongodb.MongoWriteException: Modifiers operate on fields but we found type array instead. For example: {$mod: {<field>: ...}} not {$push: [ { subCategory.name: "This is name update", subCategory.description: "This is update" } ]}
at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeSingleWriteRequest(AsyncMongoCollectionImpl.java:1075)
... 48 more
上面的异常没有被RabbitListenerCustomExceptionHandler
捕获你根本不应该抛出那个异常。您抛出的任何异常都将为您包装在 RabbitListenerException 中。