Quarkus异常加载KafkaConsumerRebalanceListener
Quarkus exception Loading KafkaConsumerRebalanceListener
有这个 Listener KafkaConsumerRebalanceListener
class
@ApplicationScoped
@Identifier("responses")
public class KafkaPartitionListener implements KafkaConsumerRebalanceListener {
private static final Logger logger = Logger.getLogger(KafkaPartitionListener.class);
public Option<Integer> partition;
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
logger.info("Topic partition collection received. Assigning partition");
partition = List.ofAll(partitions)
.headOption()
.map(tp -> {
logger.infov("Partition={0} assigned", tp.partition());
return tp.partition();
});
}
}
当我添加到我的 application.properties
文件时
mp.messaging.incoming.responses.consumer-rebalance-listener.name=responses
并将此依赖项添加到另一个 class
@Inject
KafkaPartitionListener partitionListener;
我在 运行 服务
时遇到了这个异常
java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
[error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type com.KafkaPartitionListener and qualifiers [@Default]
- java member: com.rest.Controller#partitionListener
- declared on CLASS bean [types=[com.rest.Controller, java.lang.Object], qualifiers=[@Default, @Any], target=com.rest.Controller]
The following beans match by type, but none have matching qualifiers:
- Bean [class=com.KafkaPartitionListener, qualifiers=[@Identifier(value = "responses"), @Any]]
at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1100)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:265)
at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:129)
at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:418)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at io.quarkus.deployment.ExtensionLoader.execute(ExtensionLoader.java:820)
at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
at org.jboss.threads.ContextHandler.runWith(ContextHandler.java:18)
at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
at java.base/java.lang.Thread.run(Thread.java:834)
at org.jboss.threads.JBossThread.run(JBossThread.java:501)
知道哪里出了问题吗?
此致
您必须在注入点附近使用相同的@Identifier("responses")。这是因为@Identifier 上面有@Qualifier。
有这个 Listener KafkaConsumerRebalanceListener
class
@ApplicationScoped
@Identifier("responses")
public class KafkaPartitionListener implements KafkaConsumerRebalanceListener {
private static final Logger logger = Logger.getLogger(KafkaPartitionListener.class);
public Option<Integer> partition;
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
logger.info("Topic partition collection received. Assigning partition");
partition = List.ofAll(partitions)
.headOption()
.map(tp -> {
logger.infov("Partition={0} assigned", tp.partition());
return tp.partition();
});
}
}
当我添加到我的 application.properties
文件时
mp.messaging.incoming.responses.consumer-rebalance-listener.name=responses
并将此依赖项添加到另一个 class
@Inject
KafkaPartitionListener partitionListener;
我在 运行 服务
时遇到了这个异常java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
[error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type com.KafkaPartitionListener and qualifiers [@Default]
- java member: com.rest.Controller#partitionListener
- declared on CLASS bean [types=[com.rest.Controller, java.lang.Object], qualifiers=[@Default, @Any], target=com.rest.Controller]
The following beans match by type, but none have matching qualifiers:
- Bean [class=com.KafkaPartitionListener, qualifiers=[@Identifier(value = "responses"), @Any]]
at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1100)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:265)
at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:129)
at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:418)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at io.quarkus.deployment.ExtensionLoader.execute(ExtensionLoader.java:820)
at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
at org.jboss.threads.ContextHandler.runWith(ContextHandler.java:18)
at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
at java.base/java.lang.Thread.run(Thread.java:834)
at org.jboss.threads.JBossThread.run(JBossThread.java:501)
知道哪里出了问题吗?
此致
您必须在注入点附近使用相同的@Identifier("responses")。这是因为@Identifier 上面有@Qualifier。