了解创建的 StreamProcessor 实例数以及流任务是否共享同一个流处理器实例?

Understanding Number of StreamProcessor instances created and do stream task share same streamprocessor instance?

我想更详细地了解 StreamThreadStreamTask 之间的关系以及当我们有以下条件时会创建多少 StreamProcessor 实例:

我保留了一个简单的处理器拓扑:

source_topic --> 处理器 1 --> 处理器 2 --> 处理器 3 --> sink_topic

每个处理器简单地转发到链中的下一个处理器。其中一个处理器的片段。我正在使用低级别 Java API.

public class Processor1 implements Processor<String, String> {

    private ProcessorContext context;
    public Processor1() {
    
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context
    }

    @Override
    public void punctuate(long timestamp) {
        // TODO Auto-generated method stub
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void process(String key, String value) {
        System.out.println("Inside Processor1#process() method");
        context.forward(key, value);
    }
}

主驱动程序应用程序片段:

Topology topology = new Topology();

topology.addSource("SOURCE", "source-topic-data");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic-data", "Processor3");

Properties settings = new Properties();
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
StreamsConfig config = new StreamsConfig(settings);
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

经过这样的安排,我有以下问题:

(新问题添加到原始列表)

下面是打印出来的拓扑结构:


KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42
    StreamsThread appId: my-first-streams-application
        StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42
        StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1
        Active tasks:
            Running:                                StreamsTask taskId: 0_0
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-0]
                                StreamsTask taskId: 0_1
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-1]
                                StreamsTask taskId: 0_2
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-2]
                                StreamsTask taskId: 0_3
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-3]
                                StreamsTask taskId: 0_4
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-4]
                                StreamsTask taskId: 0_5
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-5]

            Suspended:
            Restoring:
            New:
        Standby tasks:
            Running:
            Suspended:
            Restoring:
            New:


How many instances of processors (Processor1, Processor2, Processor3) will be created?

在你的例子中,每人六个。每个任务都会实例化 Topology 的完整副本。 (cf.https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L355;注:Topology是程序的逻辑表示,运行时实例化为ProcessorTopology)

As per my understanding, there will be SIX stream tasks. Is a new instance of processor created for each Stream task or they "share" the same Processor instance?

每个任务都有自己的 Processor 实例 -- 它们不共享。

When a Stream Thread is created, does it create a new instance of processor?

没有。创建任务时,它将创建新的 Processor 个实例。

Are Stream Tasks created as part of Stream Threads creation?

没有。任务是根据 partition/task 分配在重新平衡期间创建的。 KafkaStreams 在其调用 TaskManager#createTasks()

的内部 cosumner 上注册了一个 StreamsRebalanceListener

更新(问题已扩展):

In this scenario a single stream thread will have SIX stream tasks. Does a stream thread execute these stream tasks one-by-one, sort of "in-a-loop". Do stream tasks run as a separate "thread". Basically, not able to understand how a single stream thread run multiple stream tasks at the same time/parallely?

是的,StreamsThread 将循环执行任务。没有其他线程。因此,分配给同一线程的任务不会在同一 time/in-parallel 时执行,而是一个接一个地执行。(参见 https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java#L472 -- 每个 StreamThread 只使用一个 TaskManager 在内部使用 AssignedStreamsTasksAssignedStandbyTasks。)