如何将kafka IO从apache beam连接到融合云中的集群

How to connect kafka IO from apache beam to a cluster in confluent cloud

我在 Python 中创建了一个简单的管道来从 kafka 读取数据,问题是 kafka 集群位于融合的云上,我在连接它时遇到了一些问题。

我在数据流作业上收到以下日志:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
    at org.apache.beam.sdk.io.kafka.KafkaIO$Read$GenerateKafkaSourceDescriptor.processElement(KafkaIO.java:1495)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

所以我认为我在传递配置时遗漏了一些东西,因为它提到了与之相关的东西,我对所有这些都很陌生,我对 java 一无所知,所以我什至不知道如何继续阅读JAAS 文档。

管道代码如下:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging

os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'

with open('cluster.configuration.json') as cluster:
    data=json.load(cluster)
    cluster.close()

def logger(element):
    logging.INFO('Something was found')  
      
def main():
    config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "auto.offset.reset":"earliest"
    }
    print('======================================================')
    beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
    with beam.Pipeline(options=beam_options) as p:
        msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'],expansion_service="localhost:8088")
        msgs | beam.FlatMap(logger)
        
if __name__ == '__main__':    
    main()

我读到一些关于在配置字典中传递 属性 java.security.auth.login.config 的内容,但是因为那个例子是 java 而我正在使用 python 我真的迷失了我必须通过什么,或者即使那是 属性 我必须通过等等

顺便说一句,我从这里得到了 api 密钥和秘密,这就是我传递给 sasl.username 和 sasl.password

的内容

自从我解决了这个问题后,我得到了这个问题的部分答案,但遇到了另一个问题:

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
        "sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }

我需要向 sasl.jaas.config 属性提供我的集群的 api 密钥和秘密以及服务名称,但是,现在我在 运行ning 时面临不同的错误数据流上的管道:

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 

尝试 运行 数据流上的作业 4-5 分钟后显示此错误,实际上我不知道如何解决此问题,但我认为这与我的代理拒绝连接有关,我认为这可能与区域执行有关,因为集群与作业区域位于不同的区域。

更新:

我在 linux/ubuntu 上测试了代码,但我不知道为什么会自动下载 expansión 服务,因此您不会收到未经处理的信号错误,但在尝试验证融合 kafka 时仍然遇到一些问题。

我第一次尝试 beam 的扩展服务时遇到了同样的错误。您提供的密钥 sasl.mechanisms 不正确,尝试使用 sasl.mechanism 您也不需要提供用户名和密码,因为您的连接是由 jasl 验证的,基本上像下面这样的 consumer_config 有效对我来说:

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanism":data["sasl.mechanisms"],
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }