如何在kafka 0.9.0中使用多线程消费者?
How to use multi-thread consumer in kafka 0.9.0?
kafka 的文档给出了一个方法,描述如下:
One Consumer Per Thread:A simple option is to give each thread its own consumer > instance.
我的代码:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final CloudKafkaConsumer consumer;
private final String topicName;
public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
this.consumer = consumer;
this.topicName = topicName;
}
@Override
public void run() {
try {
this.consumer.subscribe(topicName);
ConsumerRecords<String, String> records;
while (!closed.get()) {
synchronized (consumer) {
records = consumer.poll(100);
}
for (ConsumerRecord<String, String> tmp : records) {
System.out.println(tmp.value());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
System.out.println(e);
//if (!closed.get()) throw e;
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
public static void main(String[] args) {
CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
.withBootstrapServers("172.31.1.159:9092")
.withGroupId("test")
.build();
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
executorService.shutdown();
}
}
但它不起作用并抛出异常:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
此外,我阅读了Flink(分布式流和批数据处理的开源平台)的源码。 Flink使用多线程消费者和我的差不多
long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
pollLoop: while (running) {
ConsumerRecords<byte[], byte[]> records;
//noinspection SynchronizeOnNonFinalField
synchronized (flinkKafkaConsumer.consumer) {
try {
records = flinkKafkaConsumer.consumer.poll(pollTimeout);
} catch (WakeupException we) {
if (running) {
throw we;
}
// leave loop
continue;
}
}
怎么了?
Kafka 消费者不是线程安全的。正如您在问题中指出的那样,该文件指出
A simple option is to give each thread its own consumer instance
但是在您的代码中,您拥有由不同 KafkaConsumerRunner 实例包装的相同消费者实例。因此,多个线程正在访问同一个消费者实例。 kafka 文档明确说明
The Kafka consumer is NOT thread-safe. All network I/O happens in the
thread of the application making the call. It is the responsibility of
the user to ensure that multi-threaded access is properly
synchronized. Un-synchronized access will result in
ConcurrentModificationException.
这正是您收到的异常。
它在您调用订阅时抛出异常。 this.consumer.subscribe(topicName);
将该块移动到同步块中,如下所示:
@Override
public void run() {
try {
synchronized (consumer) {
this.consumer.subscribe(topicName);
}
ConsumerRecords<String, String> records;
while (!closed.get()) {
synchronized (consumer) {
records = consumer.poll(100);
}
for (ConsumerRecord<String, String> tmp : records) {
System.out.println(tmp.value());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
System.out.println(e);
//if (!closed.get()) throw e;
}
}
也许不是你的情况,但如果你正在合并处理多个主题的数据,那么你可以使用同一个消费者从多个主题读取数据。如果没有,那么最好创建单独的作业来处理每个主题。
kafka 的文档给出了一个方法,描述如下:
One Consumer Per Thread:A simple option is to give each thread its own consumer > instance.
我的代码:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final CloudKafkaConsumer consumer;
private final String topicName;
public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
this.consumer = consumer;
this.topicName = topicName;
}
@Override
public void run() {
try {
this.consumer.subscribe(topicName);
ConsumerRecords<String, String> records;
while (!closed.get()) {
synchronized (consumer) {
records = consumer.poll(100);
}
for (ConsumerRecord<String, String> tmp : records) {
System.out.println(tmp.value());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
System.out.println(e);
//if (!closed.get()) throw e;
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
public static void main(String[] args) {
CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
.withBootstrapServers("172.31.1.159:9092")
.withGroupId("test")
.build();
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
executorService.shutdown();
}
}
但它不起作用并抛出异常:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
此外,我阅读了Flink(分布式流和批数据处理的开源平台)的源码。 Flink使用多线程消费者和我的差不多
long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
pollLoop: while (running) {
ConsumerRecords<byte[], byte[]> records;
//noinspection SynchronizeOnNonFinalField
synchronized (flinkKafkaConsumer.consumer) {
try {
records = flinkKafkaConsumer.consumer.poll(pollTimeout);
} catch (WakeupException we) {
if (running) {
throw we;
}
// leave loop
continue;
}
}
怎么了?
Kafka 消费者不是线程安全的。正如您在问题中指出的那样,该文件指出
A simple option is to give each thread its own consumer instance
但是在您的代码中,您拥有由不同 KafkaConsumerRunner 实例包装的相同消费者实例。因此,多个线程正在访问同一个消费者实例。 kafka 文档明确说明
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.
这正是您收到的异常。
它在您调用订阅时抛出异常。 this.consumer.subscribe(topicName);
将该块移动到同步块中,如下所示:
@Override
public void run() {
try {
synchronized (consumer) {
this.consumer.subscribe(topicName);
}
ConsumerRecords<String, String> records;
while (!closed.get()) {
synchronized (consumer) {
records = consumer.poll(100);
}
for (ConsumerRecord<String, String> tmp : records) {
System.out.println(tmp.value());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
System.out.println(e);
//if (!closed.get()) throw e;
}
}
也许不是你的情况,但如果你正在合并处理多个主题的数据,那么你可以使用同一个消费者从多个主题读取数据。如果没有,那么最好创建单独的作业来处理每个主题。