Kafka Connect S3 动态 S3 文件夹结构创建?

Kafka Connect S3 Dynamic S3 Folder Structure Creation?

manually installed Confluent Kafka Connect S3 使用独立方法而不是通过 Confluent 的过程或作为整个平台的一部分。

我可以使用以下命令从命令行成功启动连接器:

./kafka_2.11-2.1.0/bin/connect-standalone.sh connect.properties s3-sink.properties

可以看到来自 AWS MSK 的主题 CDC 偏移量正在被消耗。不会抛出任何错误。但是,在 AWS S3 中,没有为新数据创建文件夹结构,也没有存储 JSON 数据。

问题

  1. 连接器是否应该动态创建文件夹结构 看到主题的第一个 JSON 数据包?
  2. 除了配置 awscli 凭据,connect.properties 和 s3-sink.properties 是 还有任何其他设置需要设置才能正确连接到 S3桶?
  3. 关于安装文档的建议更多 比 Confluent 网站上的独立文档更全面? (以上链接)

connect.properties

bootstrap.servers=redacted:9092,redacted:9092,redacted:9092

plugin.path=/plugins/kafka-connect-s3 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets

s3-sink.properties

name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 topics=database_schema_topic1,database_schema_topic2,database_schema_topic3 s3.region=us-east-2 s3.bucket.name=databasekafka s3.part.size=5242880 flush.size=1 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.json.JsonFormat schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE

连接器是否应该在看到主题的第一个 JSON 数据包时动态创建文件夹结构? 是的,即使您使用参数 "topics.dir" 和 "path.format"

控制此路径(目录结构)

除了配置 awscli 凭据,connect.properties 和 s3-sink.properties 是否需要设置任何其他设置才能正确连接到 S3 存储桶? 默认情况下,S3 连接器将通过环境变量或凭据文件使用 Aws 凭据(访问 ID 和密钥)。 可以通过修改参数"s3.credentials.provider.class"来改变。参数的默认值为 "DefaultAWSCredentialsProviderChain"

关于安装文档的建议比 Confluent 网站上的独立文档更全面? (以上链接) 我建议您使用分布式模式,因为它为您的连接集群和其上的连接器 运行 提供了高可用性。 您可以通过以下文档以分布式模式配置连接集群。 https://docs.confluent.io/current/connect/userguide.html#connect-userguide-dist-worker-config