为什么消费者在 DC/OS 上使用 Java 的客户端 API 从 Kafka 消费消息时挂起?
Why consumer hangs while consuming messages from Kafka on DC/OS using Client API for Java?
我在 AWS 上的 DC/OS (Mesos) 集群上安装了 Kafka。启用了三个代理并创建了一个名为 "topic1".
的主题
dcos kafka topic create topic1 --partitions 3 --replication 3
然后我写了一个 Producer class 来发送消息和一个 Consumer class 来接收它们。
public class Producer {
public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
Map<String, Object> producerConfig = new HashMap<>();
System.out.println("setting Producerconfig.");
producerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
ByteArraySerializer serializer = new ByteArraySerializer();
System.out.println("Creating KafkaProcuder");
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
for (int i = 0; i < 100; i++) {
String msgstr = msg + i;
byte[] message = msgstr.getBytes();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
System.out.println("Sent:" + msgstr);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
sendMessage("Kafka test message 2/27 3:32");
}
}
public class Consumer {
public static String getMessage() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
consumerConfig.put("group.id", "dj-group");
consumerConfig.put("enable.auto.commit", "true");
consumerConfig.put("auto.offset.reset", "earliest");
ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);
kafkaConsumer.subscribe(Arrays.asList("topic1"));
while (true) {
ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
System.out.println(records.count() + " of records received.");
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(Arrays.toString(record.value()));
}
}
}
public static void main(String[] args) {
getMessage();
}
}
首先我运行Producer
在集群上发送消息给topic1
。但是当我运行 Consumer
时,它什么也收不到,只是挂起。
Producer
正在工作,因为我能够通过 运行 Kafka 安装附带的 shell 脚本获取所有消息
./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning
但是为什么我不能用 Consumer
接收? post 表明 group.id 与旧偏移量可能是一个可能的原因。我只在消费者而不是生产者中创建 group.id。如何配置该组的偏移量?
事实证明,kafkaConsumer.subscribe(Arrays.asList("topic1"));
导致 poll()
挂起。根据Kafka消费者没有收到消息
,有两种连接话题的方式,assign
和subscribe
。在我用下面的行替换 subscribe
之后,它开始工作了。
TopicPartition tp = new TopicPartition("topic1", 0);
List<TopicPartition> tps = Arrays.asList(tp);
kafkaConsumer.assign(tps);
但是输出显示的是数字数组,这不是预期的(生产者发送的字符串)。但我想这是一个单独的问题。
确保您正常关闭您的消费者:
consumer.close()
TLDR
当你有两个消费者时 运行 相同的 group id Kafka 不会将主题的相同分区分配给两者。
如果您重复 运行 一个使用相同 group id 启动消费者的应用程序并且您没有正常关闭它们,Kafka 将需要一段时间将早期 运行 的消费者视为已死并将其分区重新分配给新的。
如果新消息到达该分区并且它从未分配给您的新消费者,消费者将永远不会看到这些消息。
要调试:
- 您的主题有多少个分区:
./kafka-topics --zookeeper <host-port> --describe <topic>
- 您的组从每个分区消耗了多少:
./kafka-consumer-groups --bootstrap-server <host-port> --describe --group <group-id>
如果您的分区已经卡在陈旧的消费者上,请擦除 Kafka 的状态或使用新的组 ID。
我在 AWS 上的 DC/OS (Mesos) 集群上安装了 Kafka。启用了三个代理并创建了一个名为 "topic1".
的主题dcos kafka topic create topic1 --partitions 3 --replication 3
然后我写了一个 Producer class 来发送消息和一个 Consumer class 来接收它们。
public class Producer {
public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
Map<String, Object> producerConfig = new HashMap<>();
System.out.println("setting Producerconfig.");
producerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
ByteArraySerializer serializer = new ByteArraySerializer();
System.out.println("Creating KafkaProcuder");
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
for (int i = 0; i < 100; i++) {
String msgstr = msg + i;
byte[] message = msgstr.getBytes();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
System.out.println("Sent:" + msgstr);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
sendMessage("Kafka test message 2/27 3:32");
}
}
public class Consumer {
public static String getMessage() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
consumerConfig.put("group.id", "dj-group");
consumerConfig.put("enable.auto.commit", "true");
consumerConfig.put("auto.offset.reset", "earliest");
ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);
kafkaConsumer.subscribe(Arrays.asList("topic1"));
while (true) {
ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
System.out.println(records.count() + " of records received.");
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(Arrays.toString(record.value()));
}
}
}
public static void main(String[] args) {
getMessage();
}
}
首先我运行Producer
在集群上发送消息给topic1
。但是当我运行 Consumer
时,它什么也收不到,只是挂起。
Producer
正在工作,因为我能够通过 运行 Kafka 安装附带的 shell 脚本获取所有消息
./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning
但是为什么我不能用 Consumer
接收? post 表明 group.id 与旧偏移量可能是一个可能的原因。我只在消费者而不是生产者中创建 group.id。如何配置该组的偏移量?
事实证明,kafkaConsumer.subscribe(Arrays.asList("topic1"));
导致 poll()
挂起。根据Kafka消费者没有收到消息
,有两种连接话题的方式,assign
和subscribe
。在我用下面的行替换 subscribe
之后,它开始工作了。
TopicPartition tp = new TopicPartition("topic1", 0);
List<TopicPartition> tps = Arrays.asList(tp);
kafkaConsumer.assign(tps);
但是输出显示的是数字数组,这不是预期的(生产者发送的字符串)。但我想这是一个单独的问题。
确保您正常关闭您的消费者:
consumer.close()
TLDR
当你有两个消费者时 运行 相同的 group id Kafka 不会将主题的相同分区分配给两者。
如果您重复 运行 一个使用相同 group id 启动消费者的应用程序并且您没有正常关闭它们,Kafka 将需要一段时间将早期 运行 的消费者视为已死并将其分区重新分配给新的。
如果新消息到达该分区并且它从未分配给您的新消费者,消费者将永远不会看到这些消息。
要调试:
- 您的主题有多少个分区:
./kafka-topics --zookeeper <host-port> --describe <topic>
- 您的组从每个分区消耗了多少:
./kafka-consumer-groups --bootstrap-server <host-port> --describe --group <group-id>
如果您的分区已经卡在陈旧的消费者上,请擦除 Kafka 的状态或使用新的组 ID。