kafka java 客户端不消费 - 只是挂在 consumer.poll
kafka java client not consuming - just hangs at consumer.poll
我正在阅读 Manning 的《统一日志处理》一书,它的第一个练习是 Java 中的一个简单的 kafka 消费者。 运行 时的程序只是在调用 consumer.poll()
的地方停止。
我运行从本书作者提供的 valgrind 环境中获取它,可在 git clone https://github.com/alexanderdean/Unified-Log-Processing.git
它有 zookeeper-3.4.6
和 kafka_2.10-0.8.2.1
我使用以下命令行创建了一个主题:
./kafka-topics.sh --create --topic raw --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "raw".
kafka-console-producer
和 kafka-console-consumer
正在按预期工作。
./kafka-console-producer.sh --topic raw --broker-list localhost:9092
[2016-10-17 14:09:05,899] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
one
two
three
four
five
./kafka-console-consumer.sh --topic raw --from-beginning --zookeeper localhost:2181
one
two
three
four
five
^CConsumed 5 messages
我正在测试的 java 代码非常基础,只是创建了一个消费者。
StreamApp.java
package nile;
public class StreamApp {
public static void main(String[] args){
String servers = args[0];
String groupId = args[1];
String inTopic = args[2];
String goodTopic = args[3];
Consumer consumer = new Consumer(servers, groupId, inTopic);
consumer.run();
}
}
Consumer.java
package nile;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
public class Consumer {
private final KafkaConsumer<String, String> consumer; // a
private final String topic;
public Consumer(String servers, String groupId, String topic) {
this.consumer = new KafkaConsumer<String, String>(createConfig(servers, groupId));
this.topic = topic;
System.out.println("Topic to listen for:" + this.topic + ":");
}
public void run() {
System.out.println("Starting to listen for items ");
this.consumer.subscribe(Arrays.asList(this.topic)); // b
try {
while (true) {
System.out.println("Subscribed to: " + consumer.subscription());
System.out.println("Inside the loop");
ConsumerRecords<String, String> records = consumer.poll(100); // c
System.out.println("After consuming");
for (ConsumerRecord<String, String> record : records) {
System.out.println("Got an item from kafka: " + record.value());
}
}
} finally {
consumer.close();
}
}
private static Properties createConfig(String servers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", groupId); // e
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); // a
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); // a
return props;
}
}
我正在使用的库是(来自我的build.gradle)
dependencies { // b
compile 'org.apache.kafka:kafka-clients:0.9.0.0'
compile 'com.maxmind.geoip:geoip-api:1.2.14'
compile 'com.fasterxml.jackson.core:jackson-databind:2.6.3'
compile 'org.slf4j:slf4j-api:1.7.5'
}
我运行将代码设置为:
java -jar ./build/libs/nile-0.1.0.jar localhost:9092 ulp-ch03-3.3 raw enriched
输出为:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Topic to listen for:raw:
Starting to listen for items
Subscribed to: [raw]
Inside the loop
这就是它停止的地方。 consumer.poll()
没有返回任何东西,也没有超时。不知道这里出了什么问题。我的头发已经分叉 2 天了,如果能帮助我完成这项工作,我们将不胜感激。 :)
似乎您正在使用 0.9.x 消费者 API 来消费来自 0.8.x 服务器的消息,这是不允许的,因为 0.9.0.0 具有代理间协议从以前的版本变化。
使用旧的consumer(即Scala consumer)或者升级kafka server版本到0.9.x
我正在阅读 Manning 的《统一日志处理》一书,它的第一个练习是 Java 中的一个简单的 kafka 消费者。 运行 时的程序只是在调用 consumer.poll()
的地方停止。
我运行从本书作者提供的 valgrind 环境中获取它,可在 git clone https://github.com/alexanderdean/Unified-Log-Processing.git
它有 zookeeper-3.4.6
和 kafka_2.10-0.8.2.1
我使用以下命令行创建了一个主题:
./kafka-topics.sh --create --topic raw --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "raw".
kafka-console-producer
和 kafka-console-consumer
正在按预期工作。
./kafka-console-producer.sh --topic raw --broker-list localhost:9092
[2016-10-17 14:09:05,899] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
one
two
three
four
five
./kafka-console-consumer.sh --topic raw --from-beginning --zookeeper localhost:2181
one
two
three
four
five
^CConsumed 5 messages
我正在测试的 java 代码非常基础,只是创建了一个消费者。
StreamApp.java
package nile;
public class StreamApp {
public static void main(String[] args){
String servers = args[0];
String groupId = args[1];
String inTopic = args[2];
String goodTopic = args[3];
Consumer consumer = new Consumer(servers, groupId, inTopic);
consumer.run();
}
}
Consumer.java
package nile;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
public class Consumer {
private final KafkaConsumer<String, String> consumer; // a
private final String topic;
public Consumer(String servers, String groupId, String topic) {
this.consumer = new KafkaConsumer<String, String>(createConfig(servers, groupId));
this.topic = topic;
System.out.println("Topic to listen for:" + this.topic + ":");
}
public void run() {
System.out.println("Starting to listen for items ");
this.consumer.subscribe(Arrays.asList(this.topic)); // b
try {
while (true) {
System.out.println("Subscribed to: " + consumer.subscription());
System.out.println("Inside the loop");
ConsumerRecords<String, String> records = consumer.poll(100); // c
System.out.println("After consuming");
for (ConsumerRecord<String, String> record : records) {
System.out.println("Got an item from kafka: " + record.value());
}
}
} finally {
consumer.close();
}
}
private static Properties createConfig(String servers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", groupId); // e
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); // a
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); // a
return props;
}
}
我正在使用的库是(来自我的build.gradle)
dependencies { // b
compile 'org.apache.kafka:kafka-clients:0.9.0.0'
compile 'com.maxmind.geoip:geoip-api:1.2.14'
compile 'com.fasterxml.jackson.core:jackson-databind:2.6.3'
compile 'org.slf4j:slf4j-api:1.7.5'
}
我运行将代码设置为:
java -jar ./build/libs/nile-0.1.0.jar localhost:9092 ulp-ch03-3.3 raw enriched
输出为:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Topic to listen for:raw:
Starting to listen for items
Subscribed to: [raw]
Inside the loop
这就是它停止的地方。 consumer.poll()
没有返回任何东西,也没有超时。不知道这里出了什么问题。我的头发已经分叉 2 天了,如果能帮助我完成这项工作,我们将不胜感激。 :)
似乎您正在使用 0.9.x 消费者 API 来消费来自 0.8.x 服务器的消息,这是不允许的,因为 0.9.0.0 具有代理间协议从以前的版本变化。 使用旧的consumer(即Scala consumer)或者升级kafka server版本到0.9.x