Spring 引导 - Kafka 消费者 Bean 作用域
Spring Boot - Kafka Consumer Bean Scope
我在具有 SCOPE_REQUEST 范围的 Spring 启动应用程序中使用 CacheManager。
@Bean
@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)
public CacheManager cacheManager() {
return new ConcurrentMapCacheManager();
}
我也在使用 Kafka 进行微服务之间的通信。实际上,我正在通过 Kafka 消费者接收事件,但出现以下错误:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.cacheManager': Scope 'request' is not active for the current thread;
...
Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread?
很明显,侦听器线程中缺少 CacheManager bean。
我的目标是让 Spring Boot/Kafka 框架为每个消耗的 Kafka 事件创建平均值,就像它为 Web 请求一样。
我不知道如何实现,有人可以帮助我吗?
非常感谢,
祝你有美好的一天!
Request Scope 仅适用于 Web 应用程序;它不能与 Kafka 消费者一起使用。
@加里罗素
这同时是正确的和错误的,同时我成功找到了解决方案,创建以下 class:
public class KafkaRequestScopeAttributes implements RequestAttributes {
private Map<String, Object> requestAttributeMap = new HashMap<>();
@Override
public Object getAttribute(String name, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
return this.requestAttributeMap.get(name);
}
return null;
}
@Override
public void setAttribute(String name, Object value, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
this.requestAttributeMap.put(name, value);
}
}
@Override
public void removeAttribute(String name, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
this.requestAttributeMap.remove(name);
}
}
@Override
public String[] getAttributeNames(int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
return this.requestAttributeMap.keySet().toArray(new String[0]);
}
return new String[0];
}
@Override
public void registerDestructionCallback(String name, Runnable callback, int scope) {
// Not Supported
}
@Override
public Object resolveReference(String key) {
// Not supported
return null;
}
@Override
public String getSessionId() {
return null;
}
@Override
public Object getSessionMutex() {
return null;
}
}
然后将以下两行添加到您的 KafkaListener 方法的开始和结束:
RequestContextHolder.setRequestAttributes(new KafkaRequestScopeAttributes());
RequestContextHolder.resetRequestAttributes();
通过这样做,您可以强制在 Kafka 监听器中创建 REQUEST_SCOPE。
我在具有 SCOPE_REQUEST 范围的 Spring 启动应用程序中使用 CacheManager。
@Bean
@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)
public CacheManager cacheManager() {
return new ConcurrentMapCacheManager();
}
我也在使用 Kafka 进行微服务之间的通信。实际上,我正在通过 Kafka 消费者接收事件,但出现以下错误:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.cacheManager': Scope 'request' is not active for the current thread;
...
Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread?
很明显,侦听器线程中缺少 CacheManager bean。 我的目标是让 Spring Boot/Kafka 框架为每个消耗的 Kafka 事件创建平均值,就像它为 Web 请求一样。 我不知道如何实现,有人可以帮助我吗?
非常感谢, 祝你有美好的一天!
Request Scope 仅适用于 Web 应用程序;它不能与 Kafka 消费者一起使用。
@加里罗素 这同时是正确的和错误的,同时我成功找到了解决方案,创建以下 class:
public class KafkaRequestScopeAttributes implements RequestAttributes {
private Map<String, Object> requestAttributeMap = new HashMap<>();
@Override
public Object getAttribute(String name, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
return this.requestAttributeMap.get(name);
}
return null;
}
@Override
public void setAttribute(String name, Object value, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
this.requestAttributeMap.put(name, value);
}
}
@Override
public void removeAttribute(String name, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
this.requestAttributeMap.remove(name);
}
}
@Override
public String[] getAttributeNames(int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
return this.requestAttributeMap.keySet().toArray(new String[0]);
}
return new String[0];
}
@Override
public void registerDestructionCallback(String name, Runnable callback, int scope) {
// Not Supported
}
@Override
public Object resolveReference(String key) {
// Not supported
return null;
}
@Override
public String getSessionId() {
return null;
}
@Override
public Object getSessionMutex() {
return null;
}
}
然后将以下两行添加到您的 KafkaListener 方法的开始和结束:
RequestContextHolder.setRequestAttributes(new KafkaRequestScopeAttributes());
RequestContextHolder.resetRequestAttributes();
通过这样做,您可以强制在 Kafka 监听器中创建 REQUEST_SCOPE。