我如何在 Kafka 中使用多个消费者?
How do I use multiple consumers in Kafka?
我是一名学习 Kafka 的新生,我已经 运行 了解一些关于理解多个消费者的基本问题,到目前为止,文章、文档等对这些问题没有太大帮助。
我尝试做的一件事是同时编写我自己的高级 Kafka 生产者和消费者以及 运行 它们,向一个主题发布 100 条简单消息并让我的消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。
据我了解,对于每个主题,您可以拥有来自不同消费者组的消费者,并且这些消费者组中的每一个都会获得针对某个主题生成的消息的完整副本。这个对吗?如果不是,我设置多个消费者的正确方法是什么?这是我目前写的消费者class:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
此外,我注意到最初我是在测试只有一个分区的主题 'test' 的上述消耗。当我将另一个消费者添加到现有消费者组时说 'testGroup',这触发了 Kafka 重新平衡,这大大降低了我的消费延迟,以秒为单位。我认为这是重新平衡的问题,因为我只有一个分区,但是当我创建一个新主题 'multiplepartitions' 时,比如有 6 个分区,类似的问题出现了,即向同一消费者组添加更多消费者会导致延迟问题。我环顾四周,人们告诉我我应该使用多线程消费者——有人能阐明这一点吗?
我认为您的问题在于 auto.offset.reset 属性。当一个新的消费者从一个分区读取并且没有以前提交的偏移量时,auto.offset.reset 属性 用于决定起始偏移量应该是什么。如果您将其设置为 "largest"(默认值),您将在最新(最后)消息处开始阅读。如果将其设置为 "smallest",您将收到第一条可用消息。
所以添加:
properties.put("auto.offset.reset", "smallest");
然后重试。
* 编辑 *
"smallest" 和 "largest" 不久前被弃用了。您现在应该使用 "earliest" 或 "latest"。如有任何问题,请查看 docs
在文档 here 中说:"if you provide more threads than there are partitions on the topic, some threads will never see a message"。你能为你的主题添加分区吗?我的消费者组线程数等于主题中的分区数,并且每个线程都在接收消息。
这是我的主题配置:
buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs:
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0
我的消费者:
package com.cie.dispatcher.services;
import com.cie.dispatcher.model.WinNotification;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* This will create three threads, assign them to a "group" and listen for notifications on a topic.
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the
* lifecycle manager in dropwizard.
* <p/>
* Created by aakture on 6/15/15.
*/
public class KafkaTopicListener implements Managed {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private int threadCount;
private WinNotificationWorkflow winNotificationWorkflow;
private ObjectMapper objectMapper;
@Inject
public KafkaTopicListener(String a_zookeeper,
String a_groupId, String a_topic,
int threadCount,
WinNotificationWorkflow winNotificationWorkflow,
ObjectMapper objectMapper) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
this.threadCount = threadCount;
this.winNotificationWorkflow = winNotificationWorkflow;
this.objectMapper = objectMapper;
}
/**
* Creates the config for a connection
*
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example.
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads.
* @return the config props
*/
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void stop() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
LOG.info("Interrupted during shutdown, exiting uncleanly");
}
LOG.info("{} shutdown successfully", this.getClass().getName());
}
/**
* Starts the listener
*/
public void start() {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(threadCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(threadCount);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ListenerThread(stream, threadNumber));
threadNumber++;
}
}
private class ListenerThread implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ListenerThread(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
try {
String message = null;
LOG.info("started listener thread: {}", m_threadNumber);
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
try {
message = new String(it.next().message());
LOG.info("receive message by " + m_threadNumber + " : " + message);
WinNotification winNotification = objectMapper.readValue(message, WinNotification.class);
winNotificationWorkflow.process(winNotification);
} catch (Exception ex) {
LOG.error("error processing queue for message: " + message, ex);
}
}
LOG.info("Shutting down listener thread: " + m_threadNumber);
} catch (Exception ex) {
LOG.error("error:", ex);
}
}
}
}
如果您希望多个消费者使用相同的消息(如广播),您可以使用不同的消费者组生成它们,并在消费者配置中将 auto.offset.reset 设置为最小。
如果你想让多个消费者并行完成消费(在他们之间分工),你应该创建分区数>=消费者数。一个分区最多只能被一个消费进程消费。但是一个消费者可以消费多个分区。
我是一名学习 Kafka 的新生,我已经 运行 了解一些关于理解多个消费者的基本问题,到目前为止,文章、文档等对这些问题没有太大帮助。
我尝试做的一件事是同时编写我自己的高级 Kafka 生产者和消费者以及 运行 它们,向一个主题发布 100 条简单消息并让我的消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。
据我了解,对于每个主题,您可以拥有来自不同消费者组的消费者,并且这些消费者组中的每一个都会获得针对某个主题生成的消息的完整副本。这个对吗?如果不是,我设置多个消费者的正确方法是什么?这是我目前写的消费者class:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
此外,我注意到最初我是在测试只有一个分区的主题 'test' 的上述消耗。当我将另一个消费者添加到现有消费者组时说 'testGroup',这触发了 Kafka 重新平衡,这大大降低了我的消费延迟,以秒为单位。我认为这是重新平衡的问题,因为我只有一个分区,但是当我创建一个新主题 'multiplepartitions' 时,比如有 6 个分区,类似的问题出现了,即向同一消费者组添加更多消费者会导致延迟问题。我环顾四周,人们告诉我我应该使用多线程消费者——有人能阐明这一点吗?
我认为您的问题在于 auto.offset.reset 属性。当一个新的消费者从一个分区读取并且没有以前提交的偏移量时,auto.offset.reset 属性 用于决定起始偏移量应该是什么。如果您将其设置为 "largest"(默认值),您将在最新(最后)消息处开始阅读。如果将其设置为 "smallest",您将收到第一条可用消息。
所以添加:
properties.put("auto.offset.reset", "smallest");
然后重试。
* 编辑 *
"smallest" 和 "largest" 不久前被弃用了。您现在应该使用 "earliest" 或 "latest"。如有任何问题,请查看 docs
在文档 here 中说:"if you provide more threads than there are partitions on the topic, some threads will never see a message"。你能为你的主题添加分区吗?我的消费者组线程数等于主题中的分区数,并且每个线程都在接收消息。
这是我的主题配置:
buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs:
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0
我的消费者:
package com.cie.dispatcher.services;
import com.cie.dispatcher.model.WinNotification;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* This will create three threads, assign them to a "group" and listen for notifications on a topic.
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the
* lifecycle manager in dropwizard.
* <p/>
* Created by aakture on 6/15/15.
*/
public class KafkaTopicListener implements Managed {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private int threadCount;
private WinNotificationWorkflow winNotificationWorkflow;
private ObjectMapper objectMapper;
@Inject
public KafkaTopicListener(String a_zookeeper,
String a_groupId, String a_topic,
int threadCount,
WinNotificationWorkflow winNotificationWorkflow,
ObjectMapper objectMapper) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
this.threadCount = threadCount;
this.winNotificationWorkflow = winNotificationWorkflow;
this.objectMapper = objectMapper;
}
/**
* Creates the config for a connection
*
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example.
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads.
* @return the config props
*/
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void stop() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
LOG.info("Interrupted during shutdown, exiting uncleanly");
}
LOG.info("{} shutdown successfully", this.getClass().getName());
}
/**
* Starts the listener
*/
public void start() {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(threadCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(threadCount);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ListenerThread(stream, threadNumber));
threadNumber++;
}
}
private class ListenerThread implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ListenerThread(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
try {
String message = null;
LOG.info("started listener thread: {}", m_threadNumber);
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
try {
message = new String(it.next().message());
LOG.info("receive message by " + m_threadNumber + " : " + message);
WinNotification winNotification = objectMapper.readValue(message, WinNotification.class);
winNotificationWorkflow.process(winNotification);
} catch (Exception ex) {
LOG.error("error processing queue for message: " + message, ex);
}
}
LOG.info("Shutting down listener thread: " + m_threadNumber);
} catch (Exception ex) {
LOG.error("error:", ex);
}
}
}
}
如果您希望多个消费者使用相同的消息(如广播),您可以使用不同的消费者组生成它们,并在消费者配置中将 auto.offset.reset 设置为最小。 如果你想让多个消费者并行完成消费(在他们之间分工),你应该创建分区数>=消费者数。一个分区最多只能被一个消费进程消费。但是一个消费者可以消费多个分区。