spring-amqp 在监听器容器关闭时清理资源
spring-amqp clean up resources on listener container shutdown
我有多个消息容器,每个容器有一个监听器。
每位听众
- 不是线程安全的
- 声明为@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- 不是无国籍的。它有一个已处理消息的内部缓冲区
- 每收到 X 条消息,它就会将缓冲区刷新到磁盘
写入这种类型的磁盘非常昂贵,实现所需吞吐量的唯一方法是仅每 X 条消息刷新缓冲区。
每个侦听器都写入自己的文件,因此无需锁定。
我想在 spring 容器关闭时刷新缓冲区。
每个侦听器都实现 DisposableBean 和一个 destroy() 方法来刷新缓冲区。
经过一些研究我发现,由于 bean 被声明为作用域原型,销毁方法将永远不会被 spring 调用,因为它在 bean 创建后就停止管理它。
在这种情况下 spring 关闭时,如何清理缓冲区中剩余的消息?
SimpleMessageListenerContainer 中是否有一个挂钩,我可以使用它来调用监听器中的 destroy 方法?
自己想出来了。只需创建一个自定义的 BeanPostProcessor 也实现了 DisposableBean.
@Component
public class MyListenerBeanPostProcessor implements BeanPostProcessor, DisposableBean {
private List<MyListener> listeners = new ArrayList<ExportVisitsListenerBase>();
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof MyListener) {
listeners.add((MyListener) bean);
}
return bean;
}
@Override
public void destroy() throws Exception {
for (MyListener listener : listeners) {
listener.destroy();
}
}
}
我有多个消息容器,每个容器有一个监听器。
每位听众
- 不是线程安全的
- 声明为@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- 不是无国籍的。它有一个已处理消息的内部缓冲区
- 每收到 X 条消息,它就会将缓冲区刷新到磁盘
写入这种类型的磁盘非常昂贵,实现所需吞吐量的唯一方法是仅每 X 条消息刷新缓冲区。
每个侦听器都写入自己的文件,因此无需锁定。
我想在 spring 容器关闭时刷新缓冲区。
每个侦听器都实现 DisposableBean 和一个 destroy() 方法来刷新缓冲区。
经过一些研究我发现,由于 bean 被声明为作用域原型,销毁方法将永远不会被 spring 调用,因为它在 bean 创建后就停止管理它。
在这种情况下 spring 关闭时,如何清理缓冲区中剩余的消息?
SimpleMessageListenerContainer 中是否有一个挂钩,我可以使用它来调用监听器中的 destroy 方法?
自己想出来了。只需创建一个自定义的 BeanPostProcessor 也实现了 DisposableBean.
@Component
public class MyListenerBeanPostProcessor implements BeanPostProcessor, DisposableBean {
private List<MyListener> listeners = new ArrayList<ExportVisitsListenerBase>();
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof MyListener) {
listeners.add((MyListener) bean);
}
return bean;
}
@Override
public void destroy() throws Exception {
for (MyListener listener : listeners) {
listener.destroy();
}
}
}