手动确认 Kafka Batch 侦听器。多线程我做错了什么?

Manually acknowledge Kafka Batch listener. What I am doing wrong for multi-threading?

我正在创建一个应用程序,它使用来自 kafka 的数据并对其进行一些处理。因为,这个应用程序应该工作的规模是巨大的,我正在使用 Concurrent Batch listener(max.poll.records = 20) 和 Java 8 Executor 服务。下面是我卡住的带有注释的代码:

    //Below method is invoked by spring kafka concurrent batch listener
public void onMessage(List<ConsumerRecord<String,String>> records , Acknowledgement ack)
{
    CompletableFuture.runAsync(() -> records.stream().forEach(record -> 
    doSomeProcessingAsynchronously()),new ThreadPoolExecutor(10,10,0,TimeUnit.MILLISECONDS. new 
    LinkedBlockingQueue<>(20)));
    //I want to execute all consumed records first only then flow should come after this line
    //and acknowledge complete batch
    ack.acknowledge();
}

关于如何完成这项工作有什么建议吗?提前致谢

添加

CountDownLatch latch = new CountDownLatch(records.size());

然后在 lambda 中为每个完成的任务倒数。

在确认之前 boolean ok = latch.await(60, TimeUnit.SECONDS);

(检查超时并采取适当行动)。

records.stream().forEach(...

也就是说,你不是运行并行处理;你只是 运行 全部 20 个顺序在不同的线程上。

您需要将 forEach 移到外面。