在 @KafkaListener 注释方法中使用反应式 webflux 代码
Using reactive webflux code inside of a @KafkaListener annotated method
我正在使用 spring-kafka 来实现一个从特定主题读取消息的消费者。所有这些消息都由它们处理,并通过 REST API 导出到另一个系统。为此,代码使用了来自 Spring Webflux 项目的 WebClient,这导致了响应式代码:
@KafkaListener(topics = "${some.topic}", groupId = "my-group-id")
public void listenToTopic(final ConsumerRecord<String, String> record) {
// minimal, non-reactive code here (logging, serizializing the string)
webClient.get().uri(...).retrieve().bodyToMono(String.class)
// long, reactive chain here
.subscribe();
}
现在我想知道这个设置是否合理,或者这是否会导致很多问题,因为来自 spring-kafka 的 KafkaListener 逻辑本身并不是反应性的。我想知道是否有必要改用reactor-kafka。
我对整个反应世界以及 kafka 世界的理解非常有限,但这是我目前假设上述设置需要的内容:
- listenToTopic 函数几乎会立即 return,因为大部分工作是在反应链中完成的,不会阻止函数 returning。这意味着,据我所知,KafkaListener 逻辑将假定消息已在该处得到正确处理,因此它可能会确认它并在某个时候也提交它。如果我理解正确,那么这意味着消息的处理可能会出现问题。当 KafkaListener 已经获取下一条记录时,工作仍然可以在之前的反应链中完成。这意味着如果应用程序依赖于按严格顺序完全处理的消息,那么上述设置将是错误的。但是,如果没有,那么上面的设置就可以了吗?
- 上述设置的另一个问题是,如果有大量消息传入,应用程序可能会超负荷工作。因为侦听器功能 return 几乎立即生效,所以可能会收到大量消息同时处理反应链内部。
- @KafkaListener 逻辑内置的重试逻辑在这里不会真正起作用,因为反应链内部的异常不会触发它。任何重试逻辑都必须由侦听器函数本身内部的反应式代码处理。
- 当使用 reactor-kafka 而不是 @KafkaListener 注释时,可以更改第 1 点中描述的行为。因为侦听器现在将集成到反应链中,所以只有当反应链实际上已经完成。这样,据我了解,只有在通过反应链完全处理一条消息后,才会获取下一条消息。这也可能解决第 2-4 点中描述的 issues/behaviour。
问题:我对情况的理解是否正确?是否还有我遗漏的此设置可能导致的其他问题?
您的理解是正确的;切换到非反应性休息客户端(例如 RestTemplate
)或使用 reactor-kafka
作为消费者。
我正在使用 spring-kafka 来实现一个从特定主题读取消息的消费者。所有这些消息都由它们处理,并通过 REST API 导出到另一个系统。为此,代码使用了来自 Spring Webflux 项目的 WebClient,这导致了响应式代码:
@KafkaListener(topics = "${some.topic}", groupId = "my-group-id")
public void listenToTopic(final ConsumerRecord<String, String> record) {
// minimal, non-reactive code here (logging, serizializing the string)
webClient.get().uri(...).retrieve().bodyToMono(String.class)
// long, reactive chain here
.subscribe();
}
现在我想知道这个设置是否合理,或者这是否会导致很多问题,因为来自 spring-kafka 的 KafkaListener 逻辑本身并不是反应性的。我想知道是否有必要改用reactor-kafka。 我对整个反应世界以及 kafka 世界的理解非常有限,但这是我目前假设上述设置需要的内容:
- listenToTopic 函数几乎会立即 return,因为大部分工作是在反应链中完成的,不会阻止函数 returning。这意味着,据我所知,KafkaListener 逻辑将假定消息已在该处得到正确处理,因此它可能会确认它并在某个时候也提交它。如果我理解正确,那么这意味着消息的处理可能会出现问题。当 KafkaListener 已经获取下一条记录时,工作仍然可以在之前的反应链中完成。这意味着如果应用程序依赖于按严格顺序完全处理的消息,那么上述设置将是错误的。但是,如果没有,那么上面的设置就可以了吗?
- 上述设置的另一个问题是,如果有大量消息传入,应用程序可能会超负荷工作。因为侦听器功能 return 几乎立即生效,所以可能会收到大量消息同时处理反应链内部。
- @KafkaListener 逻辑内置的重试逻辑在这里不会真正起作用,因为反应链内部的异常不会触发它。任何重试逻辑都必须由侦听器函数本身内部的反应式代码处理。
- 当使用 reactor-kafka 而不是 @KafkaListener 注释时,可以更改第 1 点中描述的行为。因为侦听器现在将集成到反应链中,所以只有当反应链实际上已经完成。这样,据我了解,只有在通过反应链完全处理一条消息后,才会获取下一条消息。这也可能解决第 2-4 点中描述的 issues/behaviour。
问题:我对情况的理解是否正确?是否还有我遗漏的此设置可能导致的其他问题?
您的理解是正确的;切换到非反应性休息客户端(例如 RestTemplate
)或使用 reactor-kafka
作为消费者。