Spring Cloud Stream KStream Consumer Concurrency 没有作用?

Spring Cloud Stream KStream Consumer Concurrency has no effect?

我的消费者配置如 Spring cloud stream consumer properties documentation.

中所述
spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'

我有 4 个分区用于 kstream_test 主题,它们充满了来自生产者的消息,如下所示:

root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278

我的spring基于云流kafka binder的配置是:

spring.cloud.stream.bindings.input:
  destination: kstream_test
  group: consumer-group-G1_test
  consumer:
    useNativeDecoding: true
    headerMode: raw
    startOffset: latest
    partitioned: true
    concurrency: 3

KStream 监听器class

    @StreamListener
    @SendTo(MessagingStreams.OUTPUT)
    public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
        ......
        log.info("Got a message");
        ......
        return kstreams;
    }

我的生产者在 1 运行 中发送了 100 条消息。但是日志似乎只有 1 个线程 StreamThread-1 处理消息,尽管我的并发数为 3。这里可能有什么问题? 100 条消息是否不足以查看并发性?

2018-10-18 11:50:01.923  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.923  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.945  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.956  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.972  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message

更新:

根据回答,下面的 num.stream.threads 配置适用于活页夹级别。

spring.cloud.stream.kafka.streams.binder.configuration:
 num.stream.threads: 3

好像需要设置num.stream.threads来增加并发...

/** {@code num.stream.threads} */
@SuppressWarnings("WeakerAccess")
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";

...默认为 1。

活页夹真的应该根据...consumer.concurrency属性设置;请针对 the binder.

打开一个 github 问题

同时,您可以直接在...consumer.configuration中设置属性。

更正

我刚刚被告知 ...consumer.configuration 当前也未应用于流绑定器;您必须将其设置在活页夹级别。