Spring Cloud Stream KStream Consumer Concurrency 没有作用?
Spring Cloud Stream KStream Consumer Concurrency has no effect?
我的消费者配置如 Spring cloud stream consumer properties documentation.
中所述
spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'
我有 4 个分区用于 kstream_test
主题,它们充满了来自生产者的消息,如下所示:
root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278
我的spring基于云流kafka binder的配置是:
spring.cloud.stream.bindings.input:
destination: kstream_test
group: consumer-group-G1_test
consumer:
useNativeDecoding: true
headerMode: raw
startOffset: latest
partitioned: true
concurrency: 3
KStream 监听器class
@StreamListener
@SendTo(MessagingStreams.OUTPUT)
public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
......
log.info("Got a message");
......
return kstreams;
}
我的生产者在 1 运行 中发送了 100 条消息。但是日志似乎只有 1 个线程 StreamThread-1
处理消息,尽管我的并发数为 3。这里可能有什么问题? 100 条消息是否不足以查看并发性?
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.945 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.956 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.972 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
更新:
根据回答,下面的 num.stream.threads
配置适用于活页夹级别。
spring.cloud.stream.kafka.streams.binder.configuration:
num.stream.threads: 3
好像需要设置num.stream.threads
来增加并发...
/** {@code num.stream.threads} */
@SuppressWarnings("WeakerAccess")
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
...默认为 1。
活页夹真的应该根据...consumer.concurrency
属性设置;请针对 the binder.
打开一个 github 问题
同时,您可以直接在...consumer.configuration
中设置属性。
更正
我刚刚被告知 ...consumer.configuration
当前也未应用于流绑定器;您必须将其设置在活页夹级别。
我的消费者配置如 Spring cloud stream consumer properties documentation.
中所述spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'
我有 4 个分区用于 kstream_test
主题,它们充满了来自生产者的消息,如下所示:
root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278
我的spring基于云流kafka binder的配置是:
spring.cloud.stream.bindings.input:
destination: kstream_test
group: consumer-group-G1_test
consumer:
useNativeDecoding: true
headerMode: raw
startOffset: latest
partitioned: true
concurrency: 3
KStream 监听器class
@StreamListener
@SendTo(MessagingStreams.OUTPUT)
public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
......
log.info("Got a message");
......
return kstreams;
}
我的生产者在 1 运行 中发送了 100 条消息。但是日志似乎只有 1 个线程 StreamThread-1
处理消息,尽管我的并发数为 3。这里可能有什么问题? 100 条消息是否不足以查看并发性?
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.945 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.956 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.972 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
更新:
根据回答,下面的 num.stream.threads
配置适用于活页夹级别。
spring.cloud.stream.kafka.streams.binder.configuration:
num.stream.threads: 3
好像需要设置num.stream.threads
来增加并发...
/** {@code num.stream.threads} */
@SuppressWarnings("WeakerAccess")
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
...默认为 1。
活页夹真的应该根据...consumer.concurrency
属性设置;请针对 the binder.
同时,您可以直接在...consumer.configuration
中设置属性。
更正
我刚刚被告知 ...consumer.configuration
当前也未应用于流绑定器;您必须将其设置在活页夹级别。