@KafkaHandler 接收正常,但不使用@SendTo 发送回复。出了什么问题?
@KafkaHandler receives fine, but doesn't send reply with @SendTo. What's going wrong?
所以,我在 class 级别上使用 kafka 侦听器获得了这个 class。它有两个处理程序方法,两者都应该发回关于在消息的 headers 中发送的回复主题的回复。我们对消息使用 AVRO 模式,并启用特定阅读。它看起来像这样:
@Component
@Slf4j
@KafkaListener(
id = "kvk_listener",
topics = {"${kafka.topic.request.search}", "${kafka.topic.request.validation}"})
public class KvkListener {
private final CompanySearchService companySearchService;
private final CompanyDetailsService companyDetailsService;
@Autowired
public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
this.companySearchService = companySearchService;
this.companyDetailsService = companyDetailsService;
}
@SendTo
@KafkaHandler
public SearchResults listenForSearchRequests(@Payload Search search) {
log.info("received thingie");
return companySearchService.search(search.getCompanyName())
.toSearchDetails();
}
@SendTo
@KafkaHandler
public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
log.info("received other thingie");
return companyDetailsService.details(lookup.getCompanyId())
.toCompanyDetails(lookup.getProductType());
}
现在,我正在另一个服务中使用 request-reply 连接器,但响应从未到达。几乎所有配置都是通过 spring 启动 auto-configuration。我的属性如下所示:
spring.kafka.bootstrap-servers=localhost:29095
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=odp-company-kvk-service
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.properties.specific.avro.reader=true
事情是这样的。两个处理程序都不会发送回复。他们可以正常接收,我可以在日志或调试器中看到它,但他们不回复。最奇怪的是,如果我将侦听器 set-up 切换为 2 个单独的侦听器,则确实会发送回复。像这样:
@Component
@Slf4j
public class KvkListener {
private final CompanySearchService companySearchService;
private final CompanyDetailsService companyDetailsService;
@Autowired
public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
this.companySearchService = companySearchService;
this.companyDetailsService = companyDetailsService;
}
@SendTo
@KafkaListener(
topics = "${kafka.topic.request.search}")
public SearchResults listenForSearchRequests(@Payload Search search) {
log.info("received thingie");
return companySearchService.search(search.getCompanyName())
.toSearchDetails();
}
@SendTo
@KafkaListener(
topics = "${kafka.topic.request.validation}")
public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
log.info("received other thingie");
return companyDetailsService.details(lookup.getCompanyId())
.toCompanyDetails(lookup.getProductType());
}
}
我的印象是@KafkaHandler set-up 几乎相同。行为上的区别在于,如果您以某种方式结束了一条没有所需模式的消息,则处理程序允许您为 'Object' 设置一个可以捕获它的默认侦听器。这是我希望看到的,所以我希望我能让它发挥作用。
我已经尝试了从完全覆盖所有 consumer/producer bean 到显式设置回复模板的所有方法。对于为什么这不会达到我的预期,我束手无策。请 Whosebug,帮我解决问题![=13=]
这是一个错误,我 opened an issue。
我找到了更好的解决方法;注意 !{..}
而不是 #{...}
.
@Component
@KafkaListener(id = "so62569951", topics = "so62569951")
class Foo {
@KafkaHandler
@SendTo("!{source.headers.kafka_replyTopic}")
public String upcase(String in) {
System.out.println(in);
return in.toUpperCase();
}
}
所以,我在 class 级别上使用 kafka 侦听器获得了这个 class。它有两个处理程序方法,两者都应该发回关于在消息的 headers 中发送的回复主题的回复。我们对消息使用 AVRO 模式,并启用特定阅读。它看起来像这样:
@Component
@Slf4j
@KafkaListener(
id = "kvk_listener",
topics = {"${kafka.topic.request.search}", "${kafka.topic.request.validation}"})
public class KvkListener {
private final CompanySearchService companySearchService;
private final CompanyDetailsService companyDetailsService;
@Autowired
public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
this.companySearchService = companySearchService;
this.companyDetailsService = companyDetailsService;
}
@SendTo
@KafkaHandler
public SearchResults listenForSearchRequests(@Payload Search search) {
log.info("received thingie");
return companySearchService.search(search.getCompanyName())
.toSearchDetails();
}
@SendTo
@KafkaHandler
public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
log.info("received other thingie");
return companyDetailsService.details(lookup.getCompanyId())
.toCompanyDetails(lookup.getProductType());
}
现在,我正在另一个服务中使用 request-reply 连接器,但响应从未到达。几乎所有配置都是通过 spring 启动 auto-configuration。我的属性如下所示:
spring.kafka.bootstrap-servers=localhost:29095
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=odp-company-kvk-service
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.properties.specific.avro.reader=true
事情是这样的。两个处理程序都不会发送回复。他们可以正常接收,我可以在日志或调试器中看到它,但他们不回复。最奇怪的是,如果我将侦听器 set-up 切换为 2 个单独的侦听器,则确实会发送回复。像这样:
@Component
@Slf4j
public class KvkListener {
private final CompanySearchService companySearchService;
private final CompanyDetailsService companyDetailsService;
@Autowired
public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
this.companySearchService = companySearchService;
this.companyDetailsService = companyDetailsService;
}
@SendTo
@KafkaListener(
topics = "${kafka.topic.request.search}")
public SearchResults listenForSearchRequests(@Payload Search search) {
log.info("received thingie");
return companySearchService.search(search.getCompanyName())
.toSearchDetails();
}
@SendTo
@KafkaListener(
topics = "${kafka.topic.request.validation}")
public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
log.info("received other thingie");
return companyDetailsService.details(lookup.getCompanyId())
.toCompanyDetails(lookup.getProductType());
}
}
我的印象是@KafkaHandler set-up 几乎相同。行为上的区别在于,如果您以某种方式结束了一条没有所需模式的消息,则处理程序允许您为 'Object' 设置一个可以捕获它的默认侦听器。这是我希望看到的,所以我希望我能让它发挥作用。
我已经尝试了从完全覆盖所有 consumer/producer bean 到显式设置回复模板的所有方法。对于为什么这不会达到我的预期,我束手无策。请 Whosebug,帮我解决问题![=13=]
这是一个错误,我 opened an issue。
我找到了更好的解决方法;注意 !{..}
而不是 #{...}
.
@Component
@KafkaListener(id = "so62569951", topics = "so62569951")
class Foo {
@KafkaHandler
@SendTo("!{source.headers.kafka_replyTopic}")
public String upcase(String in) {
System.out.println(in);
return in.toUpperCase();
}
}