Kafka 在 Spring 引导关闭之前手动确认
Kafka manual acknowledge before Spring Boot shutdown
所以问题很简单,但是我现在好久找不到答案
我对我的 Kafka 消费者进行了手动确认,在应用程序关闭之前,我想执行一些代码然后向 Kafka 确认。所以为此我使用@PreDestroy 注释:
@PreDestroy
private void beforeShutdown() {
//do some code
acknowledgement.acknowledge(); //this variable is stored on class level
现在的主要问题是 Kafka 消费者在执行之前关闭,因此消息实际上没有得到确认,我在启动应用程序时再次收到它们,所以我需要某种解决方法或其他将此函数指定为关闭前调用的第一件事的方法。证明可以在日志中看到:
Shutting down ExecutorService
Consumer stopped
Shutting down ExecutorService 'taskScheduler'
Shutting down ExecutorService 'applicationTaskExecutor'
EXECUTING MY CODE
[Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
如果有人有什么建议请告诉。
实现SmartLifeCycle
并将代码放在stop()
中。将 bean 放在非常高的 Phase
中,以便它在容器之前停止。默认情况下,容器处于 Integer.MAX_VALUE - 100
阶段,因此它必须高于该阶段。
编辑
class Listener implements SmartLifecycle { // default phase is last (after the containers for start, before for stop).
private volatile boolean running;
@Override
public void start() {
this.running = true;
}
@Override
public void stop() {
this.running = false;
}
@Override
public boolean isRunning() {
return this.running;
}
...
}
所以问题很简单,但是我现在好久找不到答案
我对我的 Kafka 消费者进行了手动确认,在应用程序关闭之前,我想执行一些代码然后向 Kafka 确认。所以为此我使用@PreDestroy 注释:
@PreDestroy
private void beforeShutdown() {
//do some code
acknowledgement.acknowledge(); //this variable is stored on class level
现在的主要问题是 Kafka 消费者在执行之前关闭,因此消息实际上没有得到确认,我在启动应用程序时再次收到它们,所以我需要某种解决方法或其他将此函数指定为关闭前调用的第一件事的方法。证明可以在日志中看到:
Shutting down ExecutorService
Consumer stopped
Shutting down ExecutorService 'taskScheduler'
Shutting down ExecutorService 'applicationTaskExecutor'
EXECUTING MY CODE
[Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
如果有人有什么建议请告诉。
实现SmartLifeCycle
并将代码放在stop()
中。将 bean 放在非常高的 Phase
中,以便它在容器之前停止。默认情况下,容器处于 Integer.MAX_VALUE - 100
阶段,因此它必须高于该阶段。
编辑
class Listener implements SmartLifecycle { // default phase is last (after the containers for start, before for stop).
private volatile boolean running;
@Override
public void start() {
this.running = true;
}
@Override
public void stop() {
this.running = false;
}
@Override
public boolean isRunning() {
return this.running;
}
...
}