Kafka 0.10 SASL/PLAIN 生产者超时
Kafka 0.10 SASL/PLAIN producer timeout
我在 Cloudera 中安装了 3 broker kerberised Kafka 0.10 运行,我正在尝试使用 SASL/PLAIN
进行身份验证
我正在将 kafka_server_jaas.conf 传递到每个代理上的 JVM。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=admin
password=password1
user_admin=password1
user_remote=password1;
};
我的server.properties
(或Cloudera重命名的kafka.properties
)设置如下;
listeners=SASL_SSL://10.10.3.47:9093 # ip set for each broker
advertised.listeners=SASL_SSL://10.10.3.47:9093 # ip set for each broker
sasl.enabled.mechanisms=GSSAPI,PLAIN
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=GSSAPI
当 Kafka 启动时,代理间的通信一切正常,但是当我尝试使用控制台生产者进行连接时,我收到超时失败,无法更新元数据
bin/kafka-consolproducer --broker-list 10.10.3.161:9093 --topic test1 --producer.config client.properties.plain
client.properties.plain 设置为
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
最后,客户端jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="remote"
password="password1";
};
据我所知,我已正确遵循所有说明,有人能看出有什么不对吗?
更新
我稍微调高了控制台使用者的日志记录,但出现以下错误;
[2017-03-02 13:17:50,817] TRACE SSLHandshake NEED_UNWRAP channelId -1, handshakeResult Status = OK HandshakeStatus = FINISHED
bytesConsumed = 101 bytesProduced = 0, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 101 (org.apache.kafka.common.network.SslTransportLayer)
[2017-03-02 13:17:50,817] TRACE SSLHandshake FINISHED channelId -1, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 101 (org.apache.kafka.common.network.SslTransportLayer)
[2017-03-02 13:17:50,817] DEBUG Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2017-03-02 13:17:50,818] DEBUG Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2017-03-02 13:17:50,819] DEBUG Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2017-03-02 13:17:50,820] DEBUG Connection with <IPADDESS_REMOVED> disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:239)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:182)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:318)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:61)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:64)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2017-03-02 13:17:50,821] DEBUG Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
我对 SASL_PLAINTEXT auth 也有类似的问题。我能够连接到代理(通过 kafka-python),但是我从生产者发送的任何消息都会超时。
我最终发布了 SASL_PLAINTEXT 和 PLAINTEXT 侦听器,但仅通过 AWS 安全组公开了 SASL_PLAINTEXT 侦听器。
我的server_jaas.conf基本一样
我的 server.properties 使用了这些设置:
security.inter.broker.protocol=PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
advertised.listeners=SASL_PLAINTEXT://example.com:9095,PLAINTEXT://example.com:9092
listeners = SASL_PLAINTEXT://0.0.0.0:9095,PLAINTEXT://0.0.0.0:9092
我正在使用 kafka-python 客户端进行调试,我的命令如下所示 (python)
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='example.com:9095', security_protocol="SASL_PLAINTEXT", sasl_mechanism='PLAIN', sasl_plain_username='username', sasl_plain_password='password')
通过此设置,我能够进行 username/password 身份验证,还可以在没有超时的情况下向代理生成和使用消息。
希望这在某种程度上有所帮助:)
在我的例子中,不需要添加纯文本侦听器或广告侦听器。相反,问题出在我的 kafka_server_jaas.conf 中。将 username
属性 设置为客户端用于登录的名称解决了我的问题。
我在 Cloudera 中安装了 3 broker kerberised Kafka 0.10 运行,我正在尝试使用 SASL/PLAIN
进行身份验证我正在将 kafka_server_jaas.conf 传递到每个代理上的 JVM。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=admin
password=password1
user_admin=password1
user_remote=password1;
};
我的server.properties
(或Cloudera重命名的kafka.properties
)设置如下;
listeners=SASL_SSL://10.10.3.47:9093 # ip set for each broker
advertised.listeners=SASL_SSL://10.10.3.47:9093 # ip set for each broker
sasl.enabled.mechanisms=GSSAPI,PLAIN
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=GSSAPI
当 Kafka 启动时,代理间的通信一切正常,但是当我尝试使用控制台生产者进行连接时,我收到超时失败,无法更新元数据
bin/kafka-consolproducer --broker-list 10.10.3.161:9093 --topic test1 --producer.config client.properties.plain
client.properties.plain 设置为
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
最后,客户端jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="remote"
password="password1";
};
据我所知,我已正确遵循所有说明,有人能看出有什么不对吗?
更新 我稍微调高了控制台使用者的日志记录,但出现以下错误;
[2017-03-02 13:17:50,817] TRACE SSLHandshake NEED_UNWRAP channelId -1, handshakeResult Status = OK HandshakeStatus = FINISHED
bytesConsumed = 101 bytesProduced = 0, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 101 (org.apache.kafka.common.network.SslTransportLayer)
[2017-03-02 13:17:50,817] TRACE SSLHandshake FINISHED channelId -1, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 101 (org.apache.kafka.common.network.SslTransportLayer)
[2017-03-02 13:17:50,817] DEBUG Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2017-03-02 13:17:50,818] DEBUG Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2017-03-02 13:17:50,819] DEBUG Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2017-03-02 13:17:50,820] DEBUG Connection with <IPADDESS_REMOVED> disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:239)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:182)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:318)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:61)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:64)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2017-03-02 13:17:50,821] DEBUG Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
我对 SASL_PLAINTEXT auth 也有类似的问题。我能够连接到代理(通过 kafka-python),但是我从生产者发送的任何消息都会超时。
我最终发布了 SASL_PLAINTEXT 和 PLAINTEXT 侦听器,但仅通过 AWS 安全组公开了 SASL_PLAINTEXT 侦听器。
我的server_jaas.conf基本一样
我的 server.properties 使用了这些设置:
security.inter.broker.protocol=PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
advertised.listeners=SASL_PLAINTEXT://example.com:9095,PLAINTEXT://example.com:9092
listeners = SASL_PLAINTEXT://0.0.0.0:9095,PLAINTEXT://0.0.0.0:9092
我正在使用 kafka-python 客户端进行调试,我的命令如下所示 (python)
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='example.com:9095', security_protocol="SASL_PLAINTEXT", sasl_mechanism='PLAIN', sasl_plain_username='username', sasl_plain_password='password')
通过此设置,我能够进行 username/password 身份验证,还可以在没有超时的情况下向代理生成和使用消息。
希望这在某种程度上有所帮助:)
在我的例子中,不需要添加纯文本侦听器或广告侦听器。相反,问题出在我的 kafka_server_jaas.conf 中。将 username
属性 设置为客户端用于登录的名称解决了我的问题。