KafkaBindingRebalanceListener Bean 未由 KafkaMessageChannelBinder Bean 自动装配
KafkaBindingRebalanceListener Bean not autowired by KafkaMessageChannelBinder Bean
文档非常简单,建议公开类型为 KafkaBindingRebalanceListener 的 Bean,并且将在内部调用 onPartitiosnAssigned 方法。我正在尝试以某种方式做同样的事情,而 spring 框架创建其 KafkaMessageChannelBinder Bean ObjectProvider.getIfUnique() 总是 return null 因为它找不到所需的 bean。似乎当应用程序启动时 SpringFramework strats 首先创建它的 Bean 并且无法找到 Rebalance Listener Bean,因为它尚未创建。以下是项目中的三个代码片段。如果我缺少任何指示应用程序在转到 Spring Framework 之前首先在应用程序包中创建 Bean 的内容,请提供帮助。
RebalanceListener
package io.spring.dataflow.sample.seekoffset.config;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
import org.springframework.stereotype.Component;
import java.util.Collection;
@Component
public class KafkaRebalanceListener implements KafkaBindingRebalanceListener {
Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
logger.debug("onPartitionsAssigned");
}
}
配置类
package io.spring.dataflow.sample.seekoffset.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
@EnableBinding(Sink.class)
public class SeekOffsetConfig {
Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);
@StreamListener(Sink.INPUT)
public void receiveMessage(Message<String> message) {
logger.debug("receiveMessage()");
}
}
ApplicationClass
package io.spring.dataflow.sample.seekoffset;
import io.spring.dataflow.sample.seekoffset.config.KafkaRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
public class SeekOffsetApplication {
Logger logger = LoggerFactory.getLogger(SeekOffsetApplication.class);
public static void main(String[] args) {
SpringApplication.run(SeekOffsetApplication.class, args);
}
}
您使用的是什么版本?这适用于 Boot 2.3.2 和 Hoxton.SR6:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So63157778Application {
public static void main(String[] args) {
SpringApplication.run(So63157778Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
@Bean
KafkaBindingRebalanceListener rebal() {
return new KafkaBindingRebalanceListener() {
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
}
};
}
}
input assignments: [input-0], initial call :true
这也适用于我:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So63157778Application {
public static void main(String[] args) {
SpringApplication.run(So63157778Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
}
@Component
class Foo implements KafkaBindingRebalanceListener {
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
}
}
文档非常简单,建议公开类型为 KafkaBindingRebalanceListener 的 Bean,并且将在内部调用 onPartitiosnAssigned 方法。我正在尝试以某种方式做同样的事情,而 spring 框架创建其 KafkaMessageChannelBinder Bean ObjectProvider.getIfUnique() 总是 return null 因为它找不到所需的 bean。似乎当应用程序启动时 SpringFramework strats 首先创建它的 Bean 并且无法找到 Rebalance Listener Bean,因为它尚未创建。以下是项目中的三个代码片段。如果我缺少任何指示应用程序在转到 Spring Framework 之前首先在应用程序包中创建 Bean 的内容,请提供帮助。
RebalanceListener
package io.spring.dataflow.sample.seekoffset.config;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
import org.springframework.stereotype.Component;
import java.util.Collection;
@Component
public class KafkaRebalanceListener implements KafkaBindingRebalanceListener {
Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
logger.debug("onPartitionsAssigned");
}
}
配置类
package io.spring.dataflow.sample.seekoffset.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
@EnableBinding(Sink.class)
public class SeekOffsetConfig {
Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);
@StreamListener(Sink.INPUT)
public void receiveMessage(Message<String> message) {
logger.debug("receiveMessage()");
}
}
ApplicationClass
package io.spring.dataflow.sample.seekoffset;
import io.spring.dataflow.sample.seekoffset.config.KafkaRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
public class SeekOffsetApplication {
Logger logger = LoggerFactory.getLogger(SeekOffsetApplication.class);
public static void main(String[] args) {
SpringApplication.run(SeekOffsetApplication.class, args);
}
}
您使用的是什么版本?这适用于 Boot 2.3.2 和 Hoxton.SR6:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So63157778Application {
public static void main(String[] args) {
SpringApplication.run(So63157778Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
@Bean
KafkaBindingRebalanceListener rebal() {
return new KafkaBindingRebalanceListener() {
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
}
};
}
}
input assignments: [input-0], initial call :true
这也适用于我:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So63157778Application {
public static void main(String[] args) {
SpringApplication.run(So63157778Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
}
@Component
class Foo implements KafkaBindingRebalanceListener {
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
}
}