在 Confluent Kafka 上进行身份验证时出现 kerberos 错误
kerberos error while authenticating on Confluent Kafka
我一直在尝试理解 apache beam、融合的 kafka 和数据流与 python 3.8 和 beam sdk 2.7 的集成,期望的结果是构建一个管道(这将是 运行在数据流上)从汇合的kafka消耗并且只在gcp上记录消息。(我正在使用JDK 17 btw)
这是我使用的代码:
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
with open('cluster.configuration.json') as cluster:
data=json.load(cluster)
cluster.close()
def logger(element):
logging.INFO('Something was found')
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":"SASL_SSL",
"sasl.mechanisms":"PLAIN",
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}
def main():
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
msgs | beam.FlatMap(logger)
if __name__ == '__main__':
main()
我已经用数据流和直接运行器测试了这个管道,在两个运行器上我都收到了这个错误:“获取主题元数据时超时”。
这个错误似乎是由于我收到这些警告后消费者无法验证融合的 kafka 造成的:
WARNING:root:severity: WARN
timestamp {
seconds: 1650473787
nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"
在这个警告之后我收到另一个警告:
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]\n\tat
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
两件重要的事情是我已经在 python 上构建了一个消费者但没有 ReadFromKafka apache beam io 并且它连接并完美地消费到主题所以我使用的凭证是相同的而且我有相同的协议“SASL_SSL”“PLAIN”(与此相关,我也不知道为什么会出现 kerberos 错误,因为我没有使用 kerberos 身份验证)...另一件事是 t 运行sform 'ReadFromKafka' 通过扩展服务使用,因为此 t运行sform 仅受 java 支持,但使用 apache beam 我可以在 Python 上使用它。
好的,这个错误很容易改正,我在 'sasl.mechanisms' 中有错字,所以 属性 没有被识别。
而不是 sasl.mechanisms 使用 sasl.mechanism.
我一直在尝试理解 apache beam、融合的 kafka 和数据流与 python 3.8 和 beam sdk 2.7 的集成,期望的结果是构建一个管道(这将是 运行在数据流上)从汇合的kafka消耗并且只在gcp上记录消息。(我正在使用JDK 17 btw)
这是我使用的代码:
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
with open('cluster.configuration.json') as cluster:
data=json.load(cluster)
cluster.close()
def logger(element):
logging.INFO('Something was found')
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":"SASL_SSL",
"sasl.mechanisms":"PLAIN",
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}
def main():
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
msgs | beam.FlatMap(logger)
if __name__ == '__main__':
main()
我已经用数据流和直接运行器测试了这个管道,在两个运行器上我都收到了这个错误:“获取主题元数据时超时”。
这个错误似乎是由于我收到这些警告后消费者无法验证融合的 kafka 造成的:
WARNING:root:severity: WARN
timestamp {
seconds: 1650473787
nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"
在这个警告之后我收到另一个警告:
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]\n\tat
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
两件重要的事情是我已经在 python 上构建了一个消费者但没有 ReadFromKafka apache beam io 并且它连接并完美地消费到主题所以我使用的凭证是相同的而且我有相同的协议“SASL_SSL”“PLAIN”(与此相关,我也不知道为什么会出现 kerberos 错误,因为我没有使用 kerberos 身份验证)...另一件事是 t 运行sform 'ReadFromKafka' 通过扩展服务使用,因为此 t运行sform 仅受 java 支持,但使用 apache beam 我可以在 Python 上使用它。
好的,这个错误很容易改正,我在 'sasl.mechanisms' 中有错字,所以 属性 没有被识别。
而不是 sasl.mechanisms 使用 sasl.mechanism.