Sarama 无法与 Kafka 服务器通信
Sarama cannot talk to the Kafka server
所以我正在尝试配置 Sarama(kafka 的原生 go 客户端)生产者客户端。我相应地配置了我的 TLS,确保使用正确的密码生成客户端证书。我初始化客户端的 Go 代码如下所示:
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"io/ioutil"
"net"
"path/filepath"
"github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"
)
const (
certFile = "client_ingestion_client.pem"
keyFile = "client_ingestion_client.key"
)
func InitKafkaClient(host string, port string, certPath string) (sarama.AsyncProducer, error) {
cf := filepath.Join(certPath, certFile)
kf := filepath.Join(certPath, keyFile)
// Log cert and key path
log.Debugln(cf)
log.Debugln(kf)
// Read the cert in
certIn, err := ioutil.ReadFile(cf)
if err != nil {
log.Error("cannot read cert", err)
return nil, err
}
// Read & decode the encrypted key file with the pass to make tls work
keyIn, err := ioutil.ReadFile(kf)
if err != nil {
log.Error("cannot read key", err)
return nil, err
}
// Decode and decrypt our PEM block as DER
decodedPEM, _ := pem.Decode([]byte(keyIn))
decrypedPemBlock, err := x509.DecryptPEMBlock(decodedPEM, []byte("m4d3ups3curity4k4fka?"))
if err != nil {
log.Error("cannot decrypt pem block", err)
return nil, err
}
// Parse the DER encoded block as PEM
rsaKey, err := x509.ParsePKCS1ParrivateKey(decrypedPemBlock)
if err != nil {
log.Error("failed to parse rsa as pem", err)
return nil, err
}
// Marshal the pem encoded RSA key to bytes in memory
pemdata := pem.EncodeToMemory(
&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(rsaKey),
},
)
if err != nil {
log.Error("cannot marshal rsa as pem in memory", err)
return nil, err
}
// Load our decrypted key pair
crt, err := tls.X509KeyPair(certIn, pemdata)
if err != nil {
log.Error("cannot load key pair", err)
return nil, err
}
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{crt},
CipherSuites: []uint16{
tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
},
}
// Setting this allows us not to read from successes channel
config.Producer.Return.Successes = false
// Setting this allows us not to read from errors channel
config.Producer.Return.Errors = false
client, err := sarama.NewClient([]string{net.JoinHostPort(host, port)}, config)
if err != nil {
return nil, err
}
return sarama.NewAsyncProducerFromClient(client)
}
当我初始化代码时,我收到一条错误消息:
time="2018-01-19T15:31:38Z" level=error msg="Error trying to setup kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
我已验证 Kafka 主机可达并且可以连接。见下文。
我通过检查 go 代码的输出到 openssl rsa -in client_ingestion_client.key -out decrypted.key
命令生成的输出来验证密钥是否被正确解密。我还确保密钥是使用带有正确标志的 keytool 正确生成的,包括 here.
中建议的 -keylag RSA 标志
我也 运行 openssl s_client -connect $KAFKA_HOST:$KAFKA_PORT
并得到了以下回复
verify error:num=19:self signed certificate in certificate chain
139901934057376:error:1408E0F4:SSL routines:ssl3_get_message:unexpected message:s3_both.c:408:
验证错误很好,因为我使用的是自签名证书,但我不知道接下来的错误是什么。也许这就是我的问题的原因?
进一步得到以下信息:
Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
Shared Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
Peer signing digest: SHA512
Server Temp Key: ECDH, P-256, 256 bits
---
SSL handshake has read 4668 bytes and written 169 bytes
---
New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES128-GCM-SHA256
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
Protocol : TLSv1.2
Cipher : ECDHE-RSA-AES128-GCM-SHA256
Session-ID: 5A6216C765EF33BC85FACE82B01BC506358F4D62C77817A1F7EEFB50941DAEC9
Session-ID-ctx:
Master-Key: F8641FBF63A0AC7AB2D6D941C421DCA44550448524DADF4F0A7943F7928E65D5773E60A45212A7F320B250595AA6737B
Key-Arg : None
Krb5 Principal: None
PSK identity: None
PSK identity hint: None
Start Time: 1516377799
Timeout : 300 (sec)
Verify return code: 19 (self signed certificate in certificate chain)
---
由于在 openssl 连接中引用了此密码:
ECDHE-RSA-AES128-GCM-SHA256
我尝试将这个 tls.TLS_RSA_WITH_AES_128_GCM_SHA256
添加到我的 go 代码中,这看起来很接近,但我得到了相同的错误消息,说它有 运行 个可用的经纪人可以交谈到.
所以我发现了我的问题。事实证明,kafka 部署的子域有一个自签名证书,因此我必须在客户端的 config.Net.Tls.Config 结构中设置 InsecureSkipVerify: true
。所以代码看起来像:
config.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{crt},
InsecureSkipVerify: true,
}
也不需要包括密码套件。
所以我正在尝试配置 Sarama(kafka 的原生 go 客户端)生产者客户端。我相应地配置了我的 TLS,确保使用正确的密码生成客户端证书。我初始化客户端的 Go 代码如下所示:
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"io/ioutil"
"net"
"path/filepath"
"github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"
)
const (
certFile = "client_ingestion_client.pem"
keyFile = "client_ingestion_client.key"
)
func InitKafkaClient(host string, port string, certPath string) (sarama.AsyncProducer, error) {
cf := filepath.Join(certPath, certFile)
kf := filepath.Join(certPath, keyFile)
// Log cert and key path
log.Debugln(cf)
log.Debugln(kf)
// Read the cert in
certIn, err := ioutil.ReadFile(cf)
if err != nil {
log.Error("cannot read cert", err)
return nil, err
}
// Read & decode the encrypted key file with the pass to make tls work
keyIn, err := ioutil.ReadFile(kf)
if err != nil {
log.Error("cannot read key", err)
return nil, err
}
// Decode and decrypt our PEM block as DER
decodedPEM, _ := pem.Decode([]byte(keyIn))
decrypedPemBlock, err := x509.DecryptPEMBlock(decodedPEM, []byte("m4d3ups3curity4k4fka?"))
if err != nil {
log.Error("cannot decrypt pem block", err)
return nil, err
}
// Parse the DER encoded block as PEM
rsaKey, err := x509.ParsePKCS1ParrivateKey(decrypedPemBlock)
if err != nil {
log.Error("failed to parse rsa as pem", err)
return nil, err
}
// Marshal the pem encoded RSA key to bytes in memory
pemdata := pem.EncodeToMemory(
&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(rsaKey),
},
)
if err != nil {
log.Error("cannot marshal rsa as pem in memory", err)
return nil, err
}
// Load our decrypted key pair
crt, err := tls.X509KeyPair(certIn, pemdata)
if err != nil {
log.Error("cannot load key pair", err)
return nil, err
}
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{crt},
CipherSuites: []uint16{
tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
},
}
// Setting this allows us not to read from successes channel
config.Producer.Return.Successes = false
// Setting this allows us not to read from errors channel
config.Producer.Return.Errors = false
client, err := sarama.NewClient([]string{net.JoinHostPort(host, port)}, config)
if err != nil {
return nil, err
}
return sarama.NewAsyncProducerFromClient(client)
}
当我初始化代码时,我收到一条错误消息:
time="2018-01-19T15:31:38Z" level=error msg="Error trying to setup kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
我已验证 Kafka 主机可达并且可以连接。见下文。
我通过检查 go 代码的输出到 openssl rsa -in client_ingestion_client.key -out decrypted.key
命令生成的输出来验证密钥是否被正确解密。我还确保密钥是使用带有正确标志的 keytool 正确生成的,包括 here.
我也 运行 openssl s_client -connect $KAFKA_HOST:$KAFKA_PORT
并得到了以下回复
verify error:num=19:self signed certificate in certificate chain
139901934057376:error:1408E0F4:SSL routines:ssl3_get_message:unexpected message:s3_both.c:408:
验证错误很好,因为我使用的是自签名证书,但我不知道接下来的错误是什么。也许这就是我的问题的原因?
进一步得到以下信息:
Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
Shared Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
Peer signing digest: SHA512
Server Temp Key: ECDH, P-256, 256 bits
---
SSL handshake has read 4668 bytes and written 169 bytes
---
New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES128-GCM-SHA256
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
Protocol : TLSv1.2
Cipher : ECDHE-RSA-AES128-GCM-SHA256
Session-ID: 5A6216C765EF33BC85FACE82B01BC506358F4D62C77817A1F7EEFB50941DAEC9
Session-ID-ctx:
Master-Key: F8641FBF63A0AC7AB2D6D941C421DCA44550448524DADF4F0A7943F7928E65D5773E60A45212A7F320B250595AA6737B
Key-Arg : None
Krb5 Principal: None
PSK identity: None
PSK identity hint: None
Start Time: 1516377799
Timeout : 300 (sec)
Verify return code: 19 (self signed certificate in certificate chain)
---
由于在 openssl 连接中引用了此密码:
ECDHE-RSA-AES128-GCM-SHA256
我尝试将这个 tls.TLS_RSA_WITH_AES_128_GCM_SHA256
添加到我的 go 代码中,这看起来很接近,但我得到了相同的错误消息,说它有 运行 个可用的经纪人可以交谈到.
所以我发现了我的问题。事实证明,kafka 部署的子域有一个自签名证书,因此我必须在客户端的 config.Net.Tls.Config 结构中设置 InsecureSkipVerify: true
。所以代码看起来像:
config.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{crt},
InsecureSkipVerify: true,
}
也不需要包括密码套件。