从 Guava EventBus 订阅者抛出异常

Throw exception from Guava EventBus subscriber

我正在同步使用 Guava EventBus。如果任何订阅者抛出异常,我如何回滚完整的事务?如何抛出不会被 EventBus 订阅者捕获的异常?

你只需要看看Guava的EventBusclass.

的源码

让我们从头开始:

How can I throw an Exception which will not be caught by the EventBus Subscriber?

订阅者的方法被com.google.common.eventbus.Dispatcher#dispatch方法依次调用。要调用您的订阅者的方法,EventBus 使用反射的方法 Method#invoke,如果被调用的方法抛出异常,该方法会抛出 InvocationTargetException

如您所见,InvocationTargetException(将环绕您的 Exception)按如下方式处理:

} catch (InvocationTargetException e) {
  if (e.getCause() instanceof Error) {
    throw (Error) e.getCause();
  }
  throw e;
}

在上层,异常是这样处理的:

try {
  invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
  bus.handleSubscriberException(e.getCause(), context(event));
}

TL;DR

因此,省略 EventBus 异常处理程序的唯一方法是在您的订阅方法中不抛出 Exception,而是抛出 Error - 这当然是一种不好的做法。

How can I rollback the complete transaction if any of the subscribers throw an Exception?

EventBus异常处理器通过调用com.google.common.eventbus.EventBus#handleSubscriberException方法来处理异常。它看起来像这样:

try {
  exceptionHandler.handleException(e, context);
} catch (Throwable e2) {
  // logging
}

因此,异常处理程序抛出的任何异常都无济于事。您有两个选择:

  1. 要么从你的订阅者方法中抛出错误(这太糟糕了)
  2. 或者在此流程中的任何位置手动将事务设置为仅回滚。我认为此类事情的最佳位置显然是 EventBus 异常处理程序。

继承EventBus,制作自己的eventBus,会抛出异常。 包必须 com.google.common.eventbus 因为 handleSubscriberException 是一个内部方法。

package com.google.common.eventbus;

import com.google.common.util.concurrent.MoreExecutors;

/**
 * A eventbus wihch will throw exceptions during event handle process.
 * @author ytm
 *
 */
public class ErrorThrowEventBus extends EventBus {

    /**
     * Creates a new EventBus with the given {@code identifier}.
     *
     * @param identifier a brief name for this bus, for logging purposes. Should be a valid Java
     *     identifier.
     */
    public ErrorThrowEventBus(String identifier) {
        super(
            identifier,
            MoreExecutors.directExecutor(),
            Dispatcher.perThreadDispatchQueue(),
            LoggingHandler.INSTANCE);
    }

    /**
     * Creates a new EventBus with the given {@link SubscriberExceptionHandler}.
     *
     * @param exceptionHandler Handler for subscriber exceptions.
     * @since 16.0
     */
    public ErrorThrowEventBus(SubscriberExceptionHandler exceptionHandler) {
        super(
            "default",
            MoreExecutors.directExecutor(),
            Dispatcher.perThreadDispatchQueue(),
            exceptionHandler);
    }

    /**
     * Just throw a EventHandleException if there's any exception.
     * @param e
     * @param context
     * @throws EventHandleException 
     */
    @Override
    void handleSubscriberException(Throwable e, SubscriberExceptionContext context) throws EventHandleException {
        throw new EventHandleException(e);
    }
}

我在发布者和订阅者之间使用 java.lang.ThreadLocal 变量解决了这个问题。

Publisher 需要包装在 class 中,它读取线程本地异常并抛出它

public void publish(Event event) {
  eventBus.post(event);
  if(threadLocalException != null) {
    Store threadLocalException in some variable say e
    Clear threadLocalException variable
    throw e;
  }
}

Subscriber需要包裹在一个class中才能在线程局部变量中设置异常

public abstract class EventSubscriber<T extends Event> {

  @Subscribe
  public void invoke(T event) {
    try {
      handle(event);
    } catch (Exception e) {
      Set thread local variable to e
    }
  }

  protected abstract void handle(T event);

}