KafkaBindingRebalanceListener Bean 未由 KafkaMessageChannelBinder Bean 自动装配

KafkaBindingRebalanceListener Bean not autowired by KafkaMessageChannelBinder Bean

文档非常简单,建议公开类型为 KafkaBindingRebalanceListener 的 Bean,并且将在内部调用 onPartitiosnAssigned 方法。我正在尝试以某种方式做同样的事情,而 spring 框架创建其 KafkaMessageChannelBinder Bean ObjectProvider.getIfUnique() 总是 return null 因为它找不到所需的 bean。似乎当应用程序启动时 SpringFramework strats 首先创建它的 Bean 并且无法找到 Rebalance Listener Bean,因为它尚未创建。以下是项目中的三个代码片段。如果我缺少任何指示应用程序在转到 Spring Framework 之前首先在应用程序包中创建 Bean 的内容,请提供帮助。

RebalanceListener

package io.spring.dataflow.sample.seekoffset.config;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
import org.springframework.stereotype.Component;

import java.util.Collection;

@Component
public class KafkaRebalanceListener implements KafkaBindingRebalanceListener {
    Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);

    @Override
    public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
        logger.debug("onPartitionsAssigned");
    }
}

配置类

package io.spring.dataflow.sample.seekoffset.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@EnableBinding(Sink.class)
public class SeekOffsetConfig {
    Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);

    @StreamListener(Sink.INPUT)
    public void receiveMessage(Message<String> message) {
        logger.debug("receiveMessage()");
    }
}

ApplicationClass

package io.spring.dataflow.sample.seekoffset;

import io.spring.dataflow.sample.seekoffset.config.KafkaRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
public class SeekOffsetApplication {
    Logger logger = LoggerFactory.getLogger(SeekOffsetApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(SeekOffsetApplication.class, args);
    }
}

您使用的是什么版本?这适用于 Boot 2.3.2 和 Hoxton.SR6:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So63157778Application {

    public static void main(String[] args) {
        SpringApplication.run(So63157778Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    KafkaBindingRebalanceListener rebal() {
        return new KafkaBindingRebalanceListener() {

            @Override
            public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
                    Collection<TopicPartition> partitions, boolean initial) {
                System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
            }

        };
    }

}
input assignments: [input-0], initial call :true

这也适用于我:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So63157778Application {

    public static void main(String[] args) {
        SpringApplication.run(So63157778Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
    }

}

@Component
class Foo implements KafkaBindingRebalanceListener {

    @Override
    public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
            Collection<TopicPartition> partitions, boolean initial) {
        System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
    }

}