Kafka 消费者 - Java 客户端
Kafka Consumer - Java Client
我在kafka消费者文档中看到了这个注释-
Since there are many partitions this still balances the load over many
consumer instances. Note however that there cannot be more consumer
instances than partitions.
我有 50 个分区用于单个主题。如果我将 a_numThreads 的值设为 50,是否会从每个分区中提取 1 条消息?以上消息是否意味着在我的情况下,我在任何时间点都不能创建超过 50 个线程?
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
你正在做 a_numThreads = 50
然后 Executors.newFixedThreadPool(a_numThreads);
是的,这意味着你不能在任何时间点创建超过 50 个线程,至少不能与那个执行者一起创建。
文档中所说的是,一个分区只能分配给 1 个流,如果您不是创建 50 个流而是创建 51 个流,后者将一无所获,如所解释的那样 here
我在kafka消费者文档中看到了这个注释-
Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances than partitions.
我有 50 个分区用于单个主题。如果我将 a_numThreads 的值设为 50,是否会从每个分区中提取 1 条消息?以上消息是否意味着在我的情况下,我在任何时间点都不能创建超过 50 个线程?
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
你正在做 a_numThreads = 50
然后 Executors.newFixedThreadPool(a_numThreads);
是的,这意味着你不能在任何时间点创建超过 50 个线程,至少不能与那个执行者一起创建。
文档中所说的是,一个分区只能分配给 1 个流,如果您不是创建 50 个流而是创建 51 个流,后者将一无所获,如所解释的那样 here