@KafkaHandler 接收正常,但不使用@SendTo 发送回复。出了什么问题?

@KafkaHandler receives fine, but doesn't send reply with @SendTo. What's going wrong?

所以,我在 class 级别上使用 kafka 侦听器获得了这个 class。它有两个处理程序方法,两者都应该发回关于在消息的 headers 中发送的回复主题的回复。我们对消息使用 AVRO 模式,并启用特定阅读。它看起来像这样:

@Component
@Slf4j
@KafkaListener(
        id = "kvk_listener",
        topics = {"${kafka.topic.request.search}", "${kafka.topic.request.validation}"})
public class KvkListener {
    private final CompanySearchService companySearchService;
    private final CompanyDetailsService companyDetailsService;

@Autowired
public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
    this.companySearchService = companySearchService;
    this.companyDetailsService = companyDetailsService;
}

@SendTo
@KafkaHandler
public SearchResults listenForSearchRequests(@Payload Search search) {
    log.info("received thingie");
    return companySearchService.search(search.getCompanyName())
            .toSearchDetails();
}

@SendTo
@KafkaHandler
public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
    log.info("received other thingie");
    return companyDetailsService.details(lookup.getCompanyId())
            .toCompanyDetails(lookup.getProductType());
}

现在,我正在另一个服务中使用 request-reply 连接器,但响应从未到达。几乎所有配置都是通过 spring 启动 auto-configuration。我的属性如下所示:

spring.kafka.bootstrap-servers=localhost:29095
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=odp-company-kvk-service

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

spring.kafka.properties.specific.avro.reader=true

事情是这样的。两个处理程序都不会发送回复。他们可以正常接收,我可以在日志或调试器中看到它,但他们不回复。最奇怪的是,如果我将侦听器 set-up 切换为 2 个单独的侦听器,则确实会发送回复。像这样:

@Component
@Slf4j
public class KvkListener {
    private final CompanySearchService companySearchService;
    private final CompanyDetailsService companyDetailsService;

    @Autowired
    public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
        this.companySearchService = companySearchService;
        this.companyDetailsService = companyDetailsService;
    }

    @SendTo
    @KafkaListener(
            topics = "${kafka.topic.request.search}")
    public SearchResults listenForSearchRequests(@Payload Search search) {
        log.info("received thingie");
        return companySearchService.search(search.getCompanyName())
                .toSearchDetails();
    }

    @SendTo
    @KafkaListener(
            topics = "${kafka.topic.request.validation}")
    public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
        log.info("received other thingie");
        return companyDetailsService.details(lookup.getCompanyId())
                .toCompanyDetails(lookup.getProductType());
    }
}

我的印象是@KafkaHandler set-up 几乎相同。行为上的区别在于,如果您以某种方式结束了一条没有所需模式的消息,则处理程序允许您为 'Object' 设置一个可以捕获它的默认侦听器。这是我希望看到的,所以我希望我能让它发挥作用。

我已经尝试了从完全覆盖所有 consumer/producer bean 到显式设置回复模板的所有方法。对于为什么这不会达到我的预期,我束手无策。请 Whosebug,帮我解决问题![​​=13=]

这是一个错误,我 opened an issue

我找到了更好的解决方法;注意 !{..} 而不是 #{...}.

@Component
@KafkaListener(id = "so62569951", topics = "so62569951")
class Foo {

    @KafkaHandler
    @SendTo("!{source.headers.kafka_replyTopic}")
    public String upcase(String in) {
        System.out.println(in);
        return in.toUpperCase();
    }

}