spring-kafka 用很长的任务多次处理同一条消息。

spring-kafka processing the same message multiple times with very long tasks.

这个问题已经被询问和解决了几次,但由于我的知识非常有限,我无法找到我的问题的答案。

我有一个生产者向消费者发送工作任务,该任务大约需要两个小时才能完成。我需要任务只执行一次,但是它完成然后一遍又一遍地重新开始。

我在日志中找到的最有用的东西是

2018-05-15 15:18:23.731  WARN 6888 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.1ae85859-db41-4dc2-a7e2-ab4268256e00] Synchronous auto-commit of offsets {consumer-message-0=OffsetAndMetadata{offset=34, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

这让我认为简单地将消费者任务包装在一个线程中就可以解决问题,但事实并非如此。

我的消费者的一些代码

@Component
@Slf4j
public class KafkaConsumer {

    private final CommandRunnerService commandRunnerService;

    public KafkaConsumer(CommandRunnerService commandRunnerService) {
        this.commandRunnerService = commandRunnerService;
    }

    @StreamListener(KafkaStreams.INPUT)
    public void handleWorkUnit(@Payload Steak steak) {
        commandRunnerService.executeCreateSteak(steak);
    }
}

handleWorkUnit 需要几个小时才能完成。所以我的修复尝试是

    @StreamListener(KafkaStreams.INPUT)
    public void handleWorkUnit(@Payload Steak steak) {
        Runnable task = () -> commandRunnerService.executeCreateSteak(steak);
        task.run();
    }

这没什么区别。

我使用的是开箱即​​用的配置,仅针对消费者设置了非常基础的配置

spring:
  application:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.0.100
      bindings:
        consumer-message:
          destination: consumer-message
          contentType: application/json
        consumer-response:
          destination: consumer-response
          contentType: application/json

以及我正在使用的版本:

ext {
    springCloudVersion = 'Finchley.RC1'
}

dependencies {
    compile('org.springframework.cloud:spring-cloud-stream')
    compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
    compile('org.springframework.kafka:spring-kafka')
}

如上所述,我在文档和 SO 上看到了许多复杂的示例,但我希望有一个简单的配置修复?或者更多 "beginner" 友好的例子。

干杯,

请尝试如下修复您的代码。

@StreamListener(KafkaStreams.INPUT)
public void handleWorkUnit(@Payload Steak steak) {
    Runnable task = () -> commandRunnerService.executeCreateSteak(steak);
    new Thread(task).start();
}

在您的代码中,您没有创建任何线程。您的代码只是调用 Runnablerun 方法。

相关属性是消费者的max.poll.interval.ms及其默认值 5 分钟。如果你在此期间不调用 poll() 方法,你的经纪人认为你的消费者失败了。可能这就是你失败的原因(重新平衡和分配)