Kafka 生产者创建主题但无法发送消息
Kafka producer creates topic but is not able to send messages
我是 Scala 和 Kafka 的新手,我 运行 遇到了一些麻烦。
我正在尝试将 scala kafka 生产者连接到安装在 cloudera express 服务器上的 kafka 服务器。
我已经在 these instructions 的虚拟机中完成过一次,没有任何问题。
当我 运行 生产者创建了所需的主题但发送了 none 的消息时,我认为是这样。
下面是一些代码:
卡夫卡生产者
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaProducerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("acks", "all")
props.put("retries", "2")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("block.on.buffer.full", "true")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("auto.create.topics.enable", "true")
val producer = new KafkaProducer[String, String](props)
def startCounter() {
println("Start Producer Counter")
for (i <- 1 to 100) {
producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
println("Producer - Send: " + i)
}
println("Closing producer")
producer.close()
}
}
当我执行 运行 方法时,我看到 "Producer - Send: #" 作为它的输出并且我没有收到任何错误。
所以我假设这段代码已经将消息发送到 Kafka。
在 运行 生产者之前,我在 kafka 服务器上启动了以下内容:
kafka-console-consumer --zookeeper zk:2181 --topic test-counter
但是这里我什么也没看到。
当我检查主题时,制作人应该创建的主题在列表中。
kafka-topics -zookeeper zk:2181 --list
我和消费者也有类似的问题:
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
class KafkaConsumerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("group.id", "testGroup")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("session.timeout.ms", "3000")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)
val consumer = new KafkaConsumer[String, String](props)
def start() {
println("Start Consumer")
consumer.subscribe(Arrays.asList("test-counter"))
while (true) {
val records = consumer.poll(100)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
}
}
}
}
当我通过 kafka-console-producer 在服务器上创建消息时,我看到它们出现在服务器上的 kafka-console-consumer 中,但没有出现在我编写的消费者中。
kafka-console-producer --broker-list ks:9092 --topic test-counter
KafkaServer.ZOOKEEPER_ADDRESS 与 kafka-console-consumer 的参数 zk:2181 相同, KafkaServer.KAFKA_ADDRESS 与 kafka- 的参数 ks:9092 相同控制台制作人。
我尝试了代码并发现:
应该在消费者中指定 key 和 value deserializers
属性:
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
session.timeout.ms
属性有问题。来自 here:
heartbeat.interval.ms - ... The value must be set lower than session.timeout.ms ... default: 3000
这意味着您应该增加 session.timeout.ms
value 或干脆 remove 该行,因为默认值为
属性 是 30000
大于默认值
heartbeat.interval.ms
.
执行更正后,代码有效。
如果您 运行 正在 windows 机器上?按照快速入门指南,我有同样的问题 producer/consumer 没有给出任何错误但是 运行ning 也一样,你需要在你的环境变量中设置 kafka_home KAFKA_HOME=C:\ kafka_2.13-2.6.0
然后 zooker/server/topic/consumer/producer 您 kafka/windows 下的所有内容 运行
示例:对于消费者
%KAFKA_HOME%/bin/windows/kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
我是 Scala 和 Kafka 的新手,我 运行 遇到了一些麻烦。
我正在尝试将 scala kafka 生产者连接到安装在 cloudera express 服务器上的 kafka 服务器。 我已经在 these instructions 的虚拟机中完成过一次,没有任何问题。
当我 运行 生产者创建了所需的主题但发送了 none 的消息时,我认为是这样。
下面是一些代码:
卡夫卡生产者
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaProducerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("acks", "all")
props.put("retries", "2")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("block.on.buffer.full", "true")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("auto.create.topics.enable", "true")
val producer = new KafkaProducer[String, String](props)
def startCounter() {
println("Start Producer Counter")
for (i <- 1 to 100) {
producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
println("Producer - Send: " + i)
}
println("Closing producer")
producer.close()
}
}
当我执行 运行 方法时,我看到 "Producer - Send: #" 作为它的输出并且我没有收到任何错误。 所以我假设这段代码已经将消息发送到 Kafka。
在 运行 生产者之前,我在 kafka 服务器上启动了以下内容:
kafka-console-consumer --zookeeper zk:2181 --topic test-counter
但是这里我什么也没看到。
当我检查主题时,制作人应该创建的主题在列表中。
kafka-topics -zookeeper zk:2181 --list
我和消费者也有类似的问题:
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
class KafkaConsumerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("group.id", "testGroup")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("session.timeout.ms", "3000")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)
val consumer = new KafkaConsumer[String, String](props)
def start() {
println("Start Consumer")
consumer.subscribe(Arrays.asList("test-counter"))
while (true) {
val records = consumer.poll(100)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
}
}
}
}
当我通过 kafka-console-producer 在服务器上创建消息时,我看到它们出现在服务器上的 kafka-console-consumer 中,但没有出现在我编写的消费者中。
kafka-console-producer --broker-list ks:9092 --topic test-counter
KafkaServer.ZOOKEEPER_ADDRESS 与 kafka-console-consumer 的参数 zk:2181 相同, KafkaServer.KAFKA_ADDRESS 与 kafka- 的参数 ks:9092 相同控制台制作人。
我尝试了代码并发现:
应该在消费者中指定 key 和 value deserializers 属性:
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
session.timeout.ms
属性有问题。来自 here:heartbeat.interval.ms - ... The value must be set lower than session.timeout.ms ... default: 3000
这意味着您应该增加
session.timeout.ms
value 或干脆 remove 该行,因为默认值为 属性 是30000
大于默认值heartbeat.interval.ms
.
执行更正后,代码有效。
如果您 运行 正在 windows 机器上?按照快速入门指南,我有同样的问题 producer/consumer 没有给出任何错误但是 运行ning 也一样,你需要在你的环境变量中设置 kafka_home KAFKA_HOME=C:\ kafka_2.13-2.6.0 然后 zooker/server/topic/consumer/producer 您 kafka/windows 下的所有内容 运行 示例:对于消费者 %KAFKA_HOME%/bin/windows/kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092