Java - 在注释中的单独线程上处理时忽略异常

Java - Exceptions ignored while processing on separate thread within annotation

我们有一个注释,允许我们使用轮询消费者使用 Kafka 消息。它专为长时间 运行ning 作业而设计,因此一个线程正在处理消息,而另一个线程仍可用于轮询,以防止 Kafka 认为我们的服务失败并重新平衡消费者。

我们正在使用 Spring AOP。

Class:

@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint,
      PollableStreamListener pollableStreamListener, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        // The separate thread is not busy with a previous message, so process this message:
        Runnable runnable = () -> {
          try {
            paused = true;

            // Call method to process this Kafka message
            joinPoint.proceed();

            callback.acknowledge(Status.ACCEPT);
          } catch (Throwable e) {
            callback.acknowledge(Status.REJECT);
            throw new PollableStreamListenerException(e);
          } finally {
            paused = false;
          }
        };

        executor.submit(runnable);
      } else {
        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
      }
    }
  }

}

我们一次只想处理一条消息,因此 paused 标志用于决定是现在处理该消息,还是重新排队等待稍后处理。

Runnable用于处理消息(使用joinPoint.proceed()),然后确认消息返回给Kafka。这是通过 executor.

在单独的线程中完成的

我发现如果在 runnable 中抛出异常,它会在 catch 语句中被捕获,但当我们抛出新的 PollableStreamListenerException 时不会传播异常,意味着失败不会在 DLQ 中结束。

我相信这是因为执行发生在另一个线程上,主线程已经继续执行(不等待第二个线程处理消息),因此不再能够传播到DLQ。但是,我可能会误会这一点,因为我对多线程的工作原理不是很熟悉。

我已经尝试将 executor.submit(runnable) 修改为 executor.submit(runnable).get(),这解决了问题。然而,它会导致阻塞主线程的执行,直到另一个线程完成执行,这意味着主线程不再可用于轮询新消息。实际上,这使我们的 Kafka 消费者不再是可轮询的消费者,这违背了使用注释的全部目的。

有谁知道是否可以让主线程继续 运行 并轮询消息,同时将 runnable 中抛出的异常传播到 DLQ?

在此先感谢您的帮助。


为了提供更多上下文,我们使用如下注释:

  @PollableStreamListener
  public void submitDeletion(Message<?> received) {
// Process message
}

只要在 Pollable 消息源上收到新消息,就会调用 submitDeletion 方法。我们使用 @Schedule:

检查新消息
 @Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
  public void pollForDeletionRequest() {
    log.trace("Polling for new messages");
    cleanupInput.poll(cleanupSubmissionService::submitDeletion);
  }

更新: 尝试使用 CompletableFuture

根据@kriegaex 的评论,我尝试使用 CompletableFuture。我简化了示例,使其更像是一个 POC。

@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint,
      PollableStreamListener pollableStreamListener, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
          log.info("Start execution logging");
          try {
            Thread.sleep(10000);
          } catch (Exception e) {
            log.error("Error while sleeping", e);
          }
          log.info("End execution logging");
          throw new RuntimeException("Throwing exception to force handle statement");
        }, executor).handle((s, t) -> {
          log.info("Inside handle block:");

          if (t != null) {
            log.info(t.toString());
            throw new RuntimeException(t);
          }
          return null;
        });

        try {
          completableFuture.join();
        } catch (Exception e) {
          log.error("Error while doing join()", e);
        }

        callback.acknowledge(Status.ACCEPT);
      } else {
        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
      }
    }
  }

}

我 运行 CompletableFuture 中的函数使用我的 ExecutorService 实例。它抛出异常,在 .handle() 块内处理。

我第一次测试没有下面这几行代码,发现另一个线程上的函数运行没有阻塞主线程轮询,但是handle()里面抛出的异常未传播到 DLQ。缺少的代码行:

try {
          completableFuture.join();
        } catch (Exception e) {
          log.error("Error while doing join()", e);
        }

然后我把那几行代码加进去,发现在等待completableFuture完成运行ning.

时开始阻塞主线程的执行

简而言之,使用 CompletableFuture 的行为似乎与我在初始 Runnable 中发现的相同。

作为最后的手段,我最终通过手动将错误发布到 DLQ 来处理 Runnable 中的错误,而不是依赖 Spring Cloud Stream 为我处理。最终结果有效,尽管这显然不是一个理想的解决方案。