kafka new api 0.10 不提供每个主题的流和消费者对象列表
kafka new api 0.10 doesn't provide a list of stream and consumer objects per topic
之前我一直在用0.8API。当您将主题列表传递给它时,它 return 是一个流图(每个主题一个条目)。这允许我生成一个单独的线程并将每个主题的流分配给它。每个主题中的数据太多,产生一个单独的线程有助于多任务处理。
//0.8 code sample
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
我想升级到0.10。我检查了 KafkaStreams
和 KafkaConsumer
类。 KafkaConsumer
对象采用配置属性并提供采用主题列表的订阅方法,其 return 类型为空。我找不到一种方法来处理每个主题。
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(topicsList);
conusmer.poll(long ms)
KafkaStreams
另一方面似乎也有同样的问题。
KStreamBuilder builder = new KStreamBuilder();
String [] topics = new String[] {"topic1", "topic2"};
KStream<byte[], byte[]> source = builder.stream(stringSerde, stringSerde, topics);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
有 source.foreach()
方法可用,但它是所有主题的流。任何人,有什么想法吗?
首先,使用多线程消费者很棘手,因此您在 0.8
中使用的模式希望设计得很好 :)
最佳做法是使用单线程消费者,因此,如果单个消费者同时订阅主题列表,则 "no need" 可以分隔不同的主题。然而,在使用记录时,记录对象会提供有关它来自哪个主题的信息(它携带此元数据)。因此,理论上您可以根据记录的主题将记录分派到不同的线程进行实际处理(即使不推荐这样做!)。
Kafka 通过 partitions 横向扩展,因此,如果单线程消费者无法处理负载,您应该启动多个消费者(作为 消费者组) 以扩展您的消费者处理能力。
一个更普遍的问题:如果你想按主题处理数据,为什么不使用多个消费者,每个消费者订阅一个主题?
最后但同样重要的是,在 Apache Kafka 0.10+
中,Kafka Streams API 是一个新引入的流处理库——尽管它一定不是与 0.8 KafkaStream
class 混淆(提示,没有 "s")。两者完全没有关系。
之前我一直在用0.8API。当您将主题列表传递给它时,它 return 是一个流图(每个主题一个条目)。这允许我生成一个单独的线程并将每个主题的流分配给它。每个主题中的数据太多,产生一个单独的线程有助于多任务处理。
//0.8 code sample
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
我想升级到0.10。我检查了 KafkaStreams
和 KafkaConsumer
类。 KafkaConsumer
对象采用配置属性并提供采用主题列表的订阅方法,其 return 类型为空。我找不到一种方法来处理每个主题。
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(topicsList);
conusmer.poll(long ms)
KafkaStreams
另一方面似乎也有同样的问题。
KStreamBuilder builder = new KStreamBuilder();
String [] topics = new String[] {"topic1", "topic2"};
KStream<byte[], byte[]> source = builder.stream(stringSerde, stringSerde, topics);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
有 source.foreach()
方法可用,但它是所有主题的流。任何人,有什么想法吗?
首先,使用多线程消费者很棘手,因此您在 0.8
中使用的模式希望设计得很好 :)
最佳做法是使用单线程消费者,因此,如果单个消费者同时订阅主题列表,则 "no need" 可以分隔不同的主题。然而,在使用记录时,记录对象会提供有关它来自哪个主题的信息(它携带此元数据)。因此,理论上您可以根据记录的主题将记录分派到不同的线程进行实际处理(即使不推荐这样做!)。
Kafka 通过 partitions 横向扩展,因此,如果单线程消费者无法处理负载,您应该启动多个消费者(作为 消费者组) 以扩展您的消费者处理能力。
一个更普遍的问题:如果你想按主题处理数据,为什么不使用多个消费者,每个消费者订阅一个主题?
最后但同样重要的是,在 Apache Kafka 0.10+
中,Kafka Streams API 是一个新引入的流处理库——尽管它一定不是与 0.8 KafkaStream
class 混淆(提示,没有 "s")。两者完全没有关系。