security.protocol 在 Go 中设置基本的 Kafka 消费者和生产者时出错?
security.protocol error when setting up basic Kafka consumer and producer in Go?
我正在尝试在 Go 中设置一个基本的 Kafka 客户端 - 按照此处详述的示例 https://docs.confluent.io/clients-confluent-kafka-go/current/overview.html#go-example-code and https://github.com/confluentinc/confluent-kafka-go。
我按照给定的方式编写了消费者和生产者示例,就像这样
func Produce() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "my-broker-name"})
if err != nil {
panic(err)
}
defer p.Close()
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
topic := "myTopic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
p.Flush(15 * 1000)
}
func Consume() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "my-broker-name",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
(my-broker-name 是我的主机名 + 端口的替代品,我不想在此处包含)
但是当 运行 生产函数它 returns 一个错误说
Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 31ms in state APIVERSION_QUERY)
并且当 运行 消费函数时我收到同样的错误,但也有一些说
Consumer error: 1/1 brokers are down (<nil>)
我敢肯定情况并非如此。
遗憾的是,我找不到任何关于这些错误的含义或如何修复它们的文档。我该如何解决错误,以便我能够为我的经纪人生产和消费?
更新:
我获得了我的证书并将其转换为.pem 文件,并将ConfigMap 更改为以下内容:
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "my-broker:32500",
"security.protocol": "SSL",
"ssl.certificate.location": "mycert.pem",
"ssl.ca.location": "ca-chain.pem"})
if err != nil {
panic(err)
}
不过,现在又回来了
client SSL authentication might be required (see ssl.key.location and ssl.certificate.location and consult the broker logs for more information)
这是否意味着证书有问题?还是我在某处遗漏了一个步骤?
您需要提供主机名和端口作为您的 bootstrap 服务器
"bootstrap.servers": "host1:9092"
要连接到 kafka 中的安全端口,您需要提供包含您的 ca 文件的信任库配置,或与此相关的任何安全连接应用程序
https://github.com/FluuxIO/kafka/blob/master/examples/base-client/base-client.go#L6
kafka.ConfigMap{
"bootstrap.servers"̇: "..",
"security.protocol": "SSL",
// If you're using SSL authentication, provide the client's key here
"ssl.key.location": "path-to-private-key.pem",
"ssl.certificate.location": "path-to-public-key.pem",
"ssl.key.password": "if any..",
}
新错误请看这里
What does "SSL_CTX_use_PrivateKey_file" "problems getting password error" indicate in Nginx error log?
这里的解决方案是我丢失了 ssl.key.location。我不得不向我的管理员询问密钥。一旦我包含了密钥,一切正常。我的最终配置如下所示:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "hostname:port-number",
"security.protocol": "SSL",
"ssl.ca.location": "ca-chain.pem",
"ssl.key.location": "key-location",
"ssl.certificate.location": "mycert.pem"})
if err != nil {
panic(err)
}
我正在尝试在 Go 中设置一个基本的 Kafka 客户端 - 按照此处详述的示例 https://docs.confluent.io/clients-confluent-kafka-go/current/overview.html#go-example-code and https://github.com/confluentinc/confluent-kafka-go。
我按照给定的方式编写了消费者和生产者示例,就像这样
func Produce() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "my-broker-name"})
if err != nil {
panic(err)
}
defer p.Close()
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
topic := "myTopic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
p.Flush(15 * 1000)
}
func Consume() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "my-broker-name",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
(my-broker-name 是我的主机名 + 端口的替代品,我不想在此处包含)
但是当 运行 生产函数它 returns 一个错误说
Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 31ms in state APIVERSION_QUERY)
并且当 运行 消费函数时我收到同样的错误,但也有一些说
Consumer error: 1/1 brokers are down (<nil>)
我敢肯定情况并非如此。
遗憾的是,我找不到任何关于这些错误的含义或如何修复它们的文档。我该如何解决错误,以便我能够为我的经纪人生产和消费?
更新:
我获得了我的证书并将其转换为.pem 文件,并将ConfigMap 更改为以下内容:
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "my-broker:32500",
"security.protocol": "SSL",
"ssl.certificate.location": "mycert.pem",
"ssl.ca.location": "ca-chain.pem"})
if err != nil {
panic(err)
}
不过,现在又回来了
client SSL authentication might be required (see ssl.key.location and ssl.certificate.location and consult the broker logs for more information)
这是否意味着证书有问题?还是我在某处遗漏了一个步骤?
您需要提供主机名和端口作为您的 bootstrap 服务器
"bootstrap.servers": "host1:9092"
要连接到 kafka 中的安全端口,您需要提供包含您的 ca 文件的信任库配置,或与此相关的任何安全连接应用程序
https://github.com/FluuxIO/kafka/blob/master/examples/base-client/base-client.go#L6
kafka.ConfigMap{
"bootstrap.servers"̇: "..",
"security.protocol": "SSL",
// If you're using SSL authentication, provide the client's key here
"ssl.key.location": "path-to-private-key.pem",
"ssl.certificate.location": "path-to-public-key.pem",
"ssl.key.password": "if any..",
}
新错误请看这里
What does "SSL_CTX_use_PrivateKey_file" "problems getting password error" indicate in Nginx error log?
这里的解决方案是我丢失了 ssl.key.location。我不得不向我的管理员询问密钥。一旦我包含了密钥,一切正常。我的最终配置如下所示:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "hostname:port-number",
"security.protocol": "SSL",
"ssl.ca.location": "ca-chain.pem",
"ssl.key.location": "key-location",
"ssl.certificate.location": "mycert.pem"})
if err != nil {
panic(err)
}