Java - 在注释中的单独线程上处理时忽略异常
Java - Exceptions ignored while processing on separate thread within annotation
我们有一个注释,允许我们使用轮询消费者使用 Kafka 消息。它专为长时间 运行ning 作业而设计,因此一个线程正在处理消息,而另一个线程仍可用于轮询,以防止 Kafka 认为我们的服务失败并重新平衡消费者。
我们正在使用 Spring AOP。
Class:
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
}
我们一次只想处理一条消息,因此 paused
标志用于决定是现在处理该消息,还是重新排队等待稍后处理。
Runnable
用于处理消息(使用joinPoint.proceed()
),然后确认消息返回给Kafka。这是通过 executor
.
在单独的线程中完成的
我发现如果在 runnable
中抛出异常,它会在 catch
语句中被捕获,但当我们抛出新的 PollableStreamListenerException
时不会传播异常,意味着失败不会在 DLQ 中结束。
我相信这是因为执行发生在另一个线程上,主线程已经继续执行(不等待第二个线程处理消息),因此不再能够传播到DLQ。但是,我可能会误会这一点,因为我对多线程的工作原理不是很熟悉。
我已经尝试将 executor.submit(runnable)
修改为 executor.submit(runnable).get()
,这解决了问题。然而,它会导致阻塞主线程的执行,直到另一个线程完成执行,这意味着主线程不再可用于轮询新消息。实际上,这使我们的 Kafka 消费者不再是可轮询的消费者,这违背了使用注释的全部目的。
有谁知道是否可以让主线程继续 运行 并轮询消息,同时将 runnable
中抛出的异常传播到 DLQ?
在此先感谢您的帮助。
为了提供更多上下文,我们使用如下注释:
@PollableStreamListener
public void submitDeletion(Message<?> received) {
// Process message
}
只要在 Pollable 消息源上收到新消息,就会调用 submitDeletion
方法。我们使用 @Schedule
:
检查新消息
@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
public void pollForDeletionRequest() {
log.trace("Polling for new messages");
cleanupInput.poll(cleanupSubmissionService::submitDeletion);
}
更新: 尝试使用 CompletableFuture
根据@kriegaex 的评论,我尝试使用 CompletableFuture
。我简化了示例,使其更像是一个 POC。
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("Start execution logging");
try {
Thread.sleep(10000);
} catch (Exception e) {
log.error("Error while sleeping", e);
}
log.info("End execution logging");
throw new RuntimeException("Throwing exception to force handle statement");
}, executor).handle((s, t) -> {
log.info("Inside handle block:");
if (t != null) {
log.info(t.toString());
throw new RuntimeException(t);
}
return null;
});
try {
completableFuture.join();
} catch (Exception e) {
log.error("Error while doing join()", e);
}
callback.acknowledge(Status.ACCEPT);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
}
我 运行 CompletableFuture
中的函数使用我的 ExecutorService
实例。它抛出异常,在 .handle()
块内处理。
我第一次测试没有下面这几行代码,发现另一个线程上的函数运行没有阻塞主线程轮询,但是handle()
里面抛出的异常未传播到 DLQ。缺少的代码行:
try {
completableFuture.join();
} catch (Exception e) {
log.error("Error while doing join()", e);
}
然后我把那几行代码加进去,发现在等待completableFuture
完成运行ning.
时开始阻塞主线程的执行
简而言之,使用 CompletableFuture
的行为似乎与我在初始 Runnable
中发现的相同。
作为最后的手段,我最终通过手动将错误发布到 DLQ 来处理 Runnable
中的错误,而不是依赖 Spring Cloud Stream 为我处理。最终结果有效,尽管这显然不是一个理想的解决方案。
我们有一个注释,允许我们使用轮询消费者使用 Kafka 消息。它专为长时间 运行ning 作业而设计,因此一个线程正在处理消息,而另一个线程仍可用于轮询,以防止 Kafka 认为我们的服务失败并重新平衡消费者。
我们正在使用 Spring AOP。
Class:
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
}
我们一次只想处理一条消息,因此 paused
标志用于决定是现在处理该消息,还是重新排队等待稍后处理。
Runnable
用于处理消息(使用joinPoint.proceed()
),然后确认消息返回给Kafka。这是通过 executor
.
我发现如果在 runnable
中抛出异常,它会在 catch
语句中被捕获,但当我们抛出新的 PollableStreamListenerException
时不会传播异常,意味着失败不会在 DLQ 中结束。
我相信这是因为执行发生在另一个线程上,主线程已经继续执行(不等待第二个线程处理消息),因此不再能够传播到DLQ。但是,我可能会误会这一点,因为我对多线程的工作原理不是很熟悉。
我已经尝试将 executor.submit(runnable)
修改为 executor.submit(runnable).get()
,这解决了问题。然而,它会导致阻塞主线程的执行,直到另一个线程完成执行,这意味着主线程不再可用于轮询新消息。实际上,这使我们的 Kafka 消费者不再是可轮询的消费者,这违背了使用注释的全部目的。
有谁知道是否可以让主线程继续 运行 并轮询消息,同时将 runnable
中抛出的异常传播到 DLQ?
在此先感谢您的帮助。
为了提供更多上下文,我们使用如下注释:
@PollableStreamListener
public void submitDeletion(Message<?> received) {
// Process message
}
只要在 Pollable 消息源上收到新消息,就会调用 submitDeletion
方法。我们使用 @Schedule
:
@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
public void pollForDeletionRequest() {
log.trace("Polling for new messages");
cleanupInput.poll(cleanupSubmissionService::submitDeletion);
}
更新: 尝试使用 CompletableFuture
根据@kriegaex 的评论,我尝试使用 CompletableFuture
。我简化了示例,使其更像是一个 POC。
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("Start execution logging");
try {
Thread.sleep(10000);
} catch (Exception e) {
log.error("Error while sleeping", e);
}
log.info("End execution logging");
throw new RuntimeException("Throwing exception to force handle statement");
}, executor).handle((s, t) -> {
log.info("Inside handle block:");
if (t != null) {
log.info(t.toString());
throw new RuntimeException(t);
}
return null;
});
try {
completableFuture.join();
} catch (Exception e) {
log.error("Error while doing join()", e);
}
callback.acknowledge(Status.ACCEPT);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
}
我 运行 CompletableFuture
中的函数使用我的 ExecutorService
实例。它抛出异常,在 .handle()
块内处理。
我第一次测试没有下面这几行代码,发现另一个线程上的函数运行没有阻塞主线程轮询,但是handle()
里面抛出的异常未传播到 DLQ。缺少的代码行:
try {
completableFuture.join();
} catch (Exception e) {
log.error("Error while doing join()", e);
}
然后我把那几行代码加进去,发现在等待completableFuture
完成运行ning.
简而言之,使用 CompletableFuture
的行为似乎与我在初始 Runnable
中发现的相同。
作为最后的手段,我最终通过手动将错误发布到 DLQ 来处理 Runnable
中的错误,而不是依赖 Spring Cloud Stream 为我处理。最终结果有效,尽管这显然不是一个理想的解决方案。