Kafka 生产者消费者 API 问题
Kafka Producer Consumer API Issue
我正在使用 Kafka v0.10.0.0 并创建了生产者和消费者 Java 代码。但是代码卡在 producer.send 上,日志中没有任何异常。
谁能帮忙。提前致谢。
我是using/modifying"mapr - kakfa sample program"。您可以在此处查看完整代码。
https://github.com/panwars87/kafka-sample-programs
**重要提示:我在 Maven 依赖项中将 kafka-client 版本更改为 0.10.0.0,在我的本地中将 运行 Kafka 0.10.0.0 更改为 0.10.0.0。
public class Producer {
public static void main(String[] args) throws IOException {
// set up the producer
KafkaProducer<String, String> producer;
System.out.println("Starting Producers....");
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
producer = new KafkaProducer<>(properties);
System.out.println("Property loaded successfully ....");
}
try {
for (int i = 0; i < 20; i++) {
// send lots of messages
System.out.println("Sending record one by one....");
producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message."));
System.out.println(i+" message sent....");
// every so often send to a different topic
if (i % 2 == 0) {
producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message."));
producer.send(new ProducerRecord<String, String>("summary-markers","sending message - "+i+" to summary-markers."));
producer.flush();
System.out.println("Sent msg number " + i);
}
}
} catch (Throwable throwable) {
System.out.printf("%s", throwable.getStackTrace());
throwable.printStackTrace();
} finally {
producer.close();
}
}
}
public class Consumer {
public static void main(String[] args) throws IOException {
// and the consumer
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
if (properties.getProperty("group.id") == null) {
properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
}
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
int timeouts = 0;
//noinspection InfiniteLoopStatement
while (true) {
// read records with a short timeout. If we time out, we don't really care.
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() == 0) {
timeouts++;
} else {
System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts);
timeouts = 0;
}
for (ConsumerRecord<String, String> record : records) {
switch (record.topic()) {
case "fast-messages":
System.out.println("Record value for fast-messages is :"+ record.value());
break;
case "summary-markers":
System.out.println("Record value for summary-markers is :"+ record.value());
break;
default:
throw new IllegalStateException("Shouldn't be possible to get message on topic ");
}
}
}
}
}
您 运行ning 的代码用于 mapR 的演示,它不是 Kafka。 MapR 声称 API 与 Kafka 0.9 兼容,但即便如此,mapR 对待消息偏移量的方式与 Kafka 不同(偏移量是消息的字节偏移量而不是增量偏移量)等。mapR 实现也非常非常不同地说至少。这意味着,如果幸运的话,Kafka 0.9 应用程序可能恰好在 mapR 上 运行,反之亦然。其他版本没有这样的保证。
感谢大家的所有意见。我通过调整 Mapr 代码并参考其他几篇文章解决了这个问题。 Link 解决方案 api:
https://github.com/panwars87/hadoopwork/tree/master/kafka/kafka-api
我正在使用 Kafka v0.10.0.0 并创建了生产者和消费者 Java 代码。但是代码卡在 producer.send 上,日志中没有任何异常。
谁能帮忙。提前致谢。
我是using/modifying"mapr - kakfa sample program"。您可以在此处查看完整代码。 https://github.com/panwars87/kafka-sample-programs
**重要提示:我在 Maven 依赖项中将 kafka-client 版本更改为 0.10.0.0,在我的本地中将 运行 Kafka 0.10.0.0 更改为 0.10.0.0。
public class Producer {
public static void main(String[] args) throws IOException {
// set up the producer
KafkaProducer<String, String> producer;
System.out.println("Starting Producers....");
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
producer = new KafkaProducer<>(properties);
System.out.println("Property loaded successfully ....");
}
try {
for (int i = 0; i < 20; i++) {
// send lots of messages
System.out.println("Sending record one by one....");
producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message."));
System.out.println(i+" message sent....");
// every so often send to a different topic
if (i % 2 == 0) {
producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message."));
producer.send(new ProducerRecord<String, String>("summary-markers","sending message - "+i+" to summary-markers."));
producer.flush();
System.out.println("Sent msg number " + i);
}
}
} catch (Throwable throwable) {
System.out.printf("%s", throwable.getStackTrace());
throwable.printStackTrace();
} finally {
producer.close();
}
}
}
public class Consumer {
public static void main(String[] args) throws IOException {
// and the consumer
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
if (properties.getProperty("group.id") == null) {
properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
}
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
int timeouts = 0;
//noinspection InfiniteLoopStatement
while (true) {
// read records with a short timeout. If we time out, we don't really care.
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() == 0) {
timeouts++;
} else {
System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts);
timeouts = 0;
}
for (ConsumerRecord<String, String> record : records) {
switch (record.topic()) {
case "fast-messages":
System.out.println("Record value for fast-messages is :"+ record.value());
break;
case "summary-markers":
System.out.println("Record value for summary-markers is :"+ record.value());
break;
default:
throw new IllegalStateException("Shouldn't be possible to get message on topic ");
}
}
}
}
}
您 运行ning 的代码用于 mapR 的演示,它不是 Kafka。 MapR 声称 API 与 Kafka 0.9 兼容,但即便如此,mapR 对待消息偏移量的方式与 Kafka 不同(偏移量是消息的字节偏移量而不是增量偏移量)等。mapR 实现也非常非常不同地说至少。这意味着,如果幸运的话,Kafka 0.9 应用程序可能恰好在 mapR 上 运行,反之亦然。其他版本没有这样的保证。
感谢大家的所有意见。我通过调整 Mapr 代码并参考其他几篇文章解决了这个问题。 Link 解决方案 api:
https://github.com/panwars87/hadoopwork/tree/master/kafka/kafka-api