使用 reactor-kafka 使用不同的线程从 Kafka 中的消费者组读取
Using different threads to read from a consumer group in Kafka using reactor-kafka
我需要从一个包含数百万数据的 Kafka 主题中消费。一旦我阅读了该主题,我需要将其转换并写入另一个主题。我能够使用来自该主题的消息,通过多个线程处理数据并写入另一个主题。
我按照这里的例子 https://projectreactor.io/docs/kafka/1.3.5-SNAPSHOT/reference/index.html#concurrent-ordered
这是我的代码:
public Flux<?> flux() {
KafkaSender<Integer, Person> sender = sender(senderOptions());
return KafkaReceiver.create(receiverOptions(Collections.singleton(sourceTopic)))
.receive()
.map(m -> SenderRecord.create(transform(m.value()), m.receiverOffset()))
.as(sender::send)
.doOnNext(m -> m.correlationMetadata().acknowledge())
.doOnCancel(() -> close());
}
我有多个消费者可以读取,并且由于数据量大,正在考虑添加不同的 reader 线程来读取主题。然而,reactor-kafka documentation 提到 KafkaReceiver 不是线程安全的,因为底层的 KafkaConsumer 不能被多个线程同时访问。
我正在寻找有关同时阅读某个主题的建议。
所以基本上你正在寻找的消费者组,你可以运行的最大并行消费受到你的主题的分区数量的限制。
Kafka Consumer Group机制可以让你把消费一个topic的工作分给属于同一个group的不同“readers”,分工就是group中的每个consumer单独负责一个partition (1 个或多个,基于组中消费者的数量,以及主题的分区数)
我需要从一个包含数百万数据的 Kafka 主题中消费。一旦我阅读了该主题,我需要将其转换并写入另一个主题。我能够使用来自该主题的消息,通过多个线程处理数据并写入另一个主题。 我按照这里的例子 https://projectreactor.io/docs/kafka/1.3.5-SNAPSHOT/reference/index.html#concurrent-ordered
这是我的代码:
public Flux<?> flux() {
KafkaSender<Integer, Person> sender = sender(senderOptions());
return KafkaReceiver.create(receiverOptions(Collections.singleton(sourceTopic)))
.receive()
.map(m -> SenderRecord.create(transform(m.value()), m.receiverOffset()))
.as(sender::send)
.doOnNext(m -> m.correlationMetadata().acknowledge())
.doOnCancel(() -> close());
}
我有多个消费者可以读取,并且由于数据量大,正在考虑添加不同的 reader 线程来读取主题。然而,reactor-kafka documentation 提到 KafkaReceiver 不是线程安全的,因为底层的 KafkaConsumer 不能被多个线程同时访问。
我正在寻找有关同时阅读某个主题的建议。
所以基本上你正在寻找的消费者组,你可以运行的最大并行消费受到你的主题的分区数量的限制。
Kafka Consumer Group机制可以让你把消费一个topic的工作分给属于同一个group的不同“readers”,分工就是group中的每个consumer单独负责一个partition (1 个或多个,基于组中消费者的数量,以及主题的分区数)