Spring Cloud Kafka 绑定同步 Rest 调用

Spring Cloud Kafka Binding with Synchronous Rest Call

我正在开发由 SpringMVC 和 Spring Cloud Kafka 提供支持的微服务。

为简单起见,我将只关注发出 HTTP 请求的部分。

我有如下绑定函数(请注意,我使用的是函数式绑定)。

@SpringBootApplication
public class ExampleApplication {

  // PayloadSender uses RestTemplate to send HTTP request.
  @Autowired
  private PayloadSender payloadSender;

  @Bean
  public Function<KStream<String, Input>, KStream<String, Output>> process() {

    // payloadSender.send() is a blocking call which sends payload using RestTemplate,
    // once response is received it will collect all info and create "Output" object
    return input -> input
      .map((k,v) -> KeyValue.pair(k, payloadSender.send(v))); // "send" is a blocking call

    // Question: if autoCommitOffset is set to true, would offset automatically commit right after the "map" function from KStream?
  }

  public static void main(String[] args) {
    SpringApplication.run(ExampleApplication.class, args);
  }
}

从此示例中,您可以看到 payloadSender 正在使用 RestTemplate 从输入流发送有效负载,并在收到响应后创建“Output”对象并生成到输出主题。

由于 payloadSender.send() 是阻塞的,我担心这会导致性能问题。最重要的是,如果 HTTP 请求超时,恐怕会超过提交间隔(通常 HTTP 超时间隔远大于消费者提交间隔)并导致 kafka broker 认为消费者已经死了(请更正如果我错了我)。

那么对于这种情况有没有更好的解决方案呢?我最终会切换到 spring-reactive 但目前我需要确保 MVC 模型有效。虽然我不确定 spring-reactive 是否会神奇地解决这个问题。

默认max.poll.interval为5分钟;您可以增加或减少 max.poll.records。您还可以在 rest 调用上设置超时。