如何将kafka IO从apache beam连接到融合云中的集群
How to connect kafka IO from apache beam to a cluster in confluent cloud
我在 Python 中创建了一个简单的管道来从 kafka 读取数据,问题是 kafka 集群位于融合的云上,我在连接它时遇到了一些问题。
我在数据流作业上收到以下日志:
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read$GenerateKafkaSourceDescriptor.processElement(KafkaIO.java:1495)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
所以我认为我在传递配置时遗漏了一些东西,因为它提到了与之相关的东西,我对所有这些都很陌生,我对 java 一无所知,所以我什至不知道如何继续阅读JAAS 文档。
管道代码如下:
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
with open('cluster.configuration.json') as cluster:
data=json.load(cluster)
cluster.close()
def logger(element):
logging.INFO('Something was found')
def main():
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":data["security.protocol"],
"sasl.mechanisms":data["sasl.mechanisms"],
"sasl.username":data["sasl.username"],
"sasl.password":data["sasl.password"],
"session.timeout.ms":data["session.timeout.ms"],
"auto.offset.reset":"earliest"
}
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'],expansion_service="localhost:8088")
msgs | beam.FlatMap(logger)
if __name__ == '__main__':
main()
我读到一些关于在配置字典中传递 属性 java.security.auth.login.config 的内容,但是因为那个例子是 java 而我正在使用 python 我真的迷失了我必须通过什么,或者即使那是 属性 我必须通过等等
顺便说一句,我从这里得到了 api 密钥和秘密,这就是我传递给 sasl.username 和 sasl.password
的内容
自从我解决了这个问题后,我得到了这个问题的部分答案,但遇到了另一个问题:
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":data["security.protocol"],
"sasl.mechanisms":data["sasl.mechanisms"],
"sasl.username":data["sasl.username"],
"sasl.password":data["sasl.password"],
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}
我需要向 sasl.jaas.config 属性提供我的集群的 api 密钥和秘密以及服务名称,但是,现在我在 运行ning 时面临不同的错误数据流上的管道:
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
尝试 运行 数据流上的作业 4-5 分钟后显示此错误,实际上我不知道如何解决此问题,但我认为这与我的代理拒绝连接有关,我认为这可能与区域执行有关,因为集群与作业区域位于不同的区域。
更新:
我在 linux/ubuntu 上测试了代码,但我不知道为什么会自动下载 expansión 服务,因此您不会收到未经处理的信号错误,但在尝试验证融合 kafka 时仍然遇到一些问题。
我第一次尝试 beam 的扩展服务时遇到了同样的错误。您提供的密钥 sasl.mechanisms
不正确,尝试使用 sasl.mechanism
您也不需要提供用户名和密码,因为您的连接是由 jasl 验证的,基本上像下面这样的 consumer_config 有效对我来说:
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":data["security.protocol"],
"sasl.mechanism":data["sasl.mechanisms"],
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}
我在 Python 中创建了一个简单的管道来从 kafka 读取数据,问题是 kafka 集群位于融合的云上,我在连接它时遇到了一些问题。
我在数据流作业上收到以下日志:
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read$GenerateKafkaSourceDescriptor.processElement(KafkaIO.java:1495)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
所以我认为我在传递配置时遗漏了一些东西,因为它提到了与之相关的东西,我对所有这些都很陌生,我对 java 一无所知,所以我什至不知道如何继续阅读JAAS 文档。
管道代码如下:
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
with open('cluster.configuration.json') as cluster:
data=json.load(cluster)
cluster.close()
def logger(element):
logging.INFO('Something was found')
def main():
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":data["security.protocol"],
"sasl.mechanisms":data["sasl.mechanisms"],
"sasl.username":data["sasl.username"],
"sasl.password":data["sasl.password"],
"session.timeout.ms":data["session.timeout.ms"],
"auto.offset.reset":"earliest"
}
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'],expansion_service="localhost:8088")
msgs | beam.FlatMap(logger)
if __name__ == '__main__':
main()
我读到一些关于在配置字典中传递 属性 java.security.auth.login.config 的内容,但是因为那个例子是 java 而我正在使用 python 我真的迷失了我必须通过什么,或者即使那是 属性 我必须通过等等
顺便说一句,我从这里得到了 api 密钥和秘密,这就是我传递给 sasl.username 和 sasl.password
的内容自从我解决了这个问题后,我得到了这个问题的部分答案,但遇到了另一个问题:
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":data["security.protocol"],
"sasl.mechanisms":data["sasl.mechanisms"],
"sasl.username":data["sasl.username"],
"sasl.password":data["sasl.password"],
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}
我需要向 sasl.jaas.config 属性提供我的集群的 api 密钥和秘密以及服务名称,但是,现在我在 运行ning 时面临不同的错误数据流上的管道:
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
尝试 运行 数据流上的作业 4-5 分钟后显示此错误,实际上我不知道如何解决此问题,但我认为这与我的代理拒绝连接有关,我认为这可能与区域执行有关,因为集群与作业区域位于不同的区域。
更新:
我在 linux/ubuntu 上测试了代码,但我不知道为什么会自动下载 expansión 服务,因此您不会收到未经处理的信号错误,但在尝试验证融合 kafka 时仍然遇到一些问题。
我第一次尝试 beam 的扩展服务时遇到了同样的错误。您提供的密钥 sasl.mechanisms
不正确,尝试使用 sasl.mechanism
您也不需要提供用户名和密码,因为您的连接是由 jasl 验证的,基本上像下面这样的 consumer_config 有效对我来说:
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":data["security.protocol"],
"sasl.mechanism":data["sasl.mechanisms"],
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}