Kafka 流处理器线程安全吗?

Kafka stream processor thread safe?

我知道之前有人在这里问过这个问题:

但这对我来说很奇怪。根据文档(或者我可能遗漏了一些东西),每个分区都有一个任务,这意味着处理器的不同实例,并且每个任务都由不同的线程执行。但是当我测试的时候,我看到不同的线程可以获得不同的处理器实例。因此,如果您想在处理器中保留任何内存状态(老式方式),您必须锁定吗?

示例代码:

public class SomeProcessor extends AbstractProcessor<String, JsonObject> {

   private final String ID = UUID.randomUUID().toString();

   @Override
   public void process(String key, JsonObject value) {
     System.out.println("Thread id: " + Thread.currentThread().getId() +" ID: " + ID);

输出:

线程ID:88 ID:26b11094-a094-404b-b610-88b38cc9d1ef

线程id:88 ID:c667e669-9023-494b-9345-236777e9dfda

线程id:88 ID:c667e669-9023-494b-9345-236777e9dfda

线程 ID:90 ID:0a43ecb0-26f2-440d-88e2-87e0c9cc4927

线程id:90 ID:c667e669-9023-494b-9345-236777e9dfda

线程id:90 ID:c667e669-9023-494b-9345-236777e9dfda

有没有办法强制每个实例线程?

每个实例的线程数是一个配置参数(num.stream.threads,默认值为 1)。因此,如果您启动一个 KafkaStreams 实例,您将获得 num.stream.threads 个线程。

任务将工作拆分为并行单元(基于您输入的主题分区)并将分配给线程。因此,如果您有多个任务和一个线程,则所有任务都将分配给该线程。如果您有两个线程(所有 KafkaStreams 个实例的总和)每个线程执行大约 50% 的任务。

Note: because a Kafka Streams application is distributed in nature, there is no difference if you run a single KafkaStreams instance with multiple threads, or multiple KafkaStreams instanced with one thread each. Tasks will be distributed over all available threads of your application.

如果您想在任务之间共享任何数据结构并且您有一个以上的线程,则您有责任同步对该数据结构的访问。请注意,任务到线程的分配在运行时可能会发生变化,因此,所有访问都必须同步。 但是,不推荐使用此模式,因为它会限制可伸缩性。你应该设计没有共享数据结构的程序! 这样做的主要原因是,你的程序通常分布在多台机器上,因此,不同的 KafkaStreams 实例无法访问共享数据结构反正。共享数据结构只能在单个 JVM 中工作,但使用单个 JVM 可以防止应用程序横向扩展。