Kafka Connect S3 动态 S3 文件夹结构创建?
Kafka Connect S3 Dynamic S3 Folder Structure Creation?
我 manually installed Confluent Kafka Connect S3 使用独立方法而不是通过 Confluent 的过程或作为整个平台的一部分。
我可以使用以下命令从命令行成功启动连接器:
./kafka_2.11-2.1.0/bin/connect-standalone.sh connect.properties s3-sink.properties
可以看到来自 AWS MSK 的主题 CDC 偏移量正在被消耗。不会抛出任何错误。但是,在 AWS S3 中,没有为新数据创建文件夹结构,也没有存储 JSON 数据。
问题
- 连接器是否应该动态创建文件夹结构
看到主题的第一个 JSON 数据包?
- 除了配置
awscli 凭据,connect.properties 和 s3-sink.properties 是
还有任何其他设置需要设置才能正确连接到
S3桶?
- 关于安装文档的建议更多
比 Confluent 网站上的独立文档更全面? (以上链接)
connect.properties
bootstrap.servers=redacted:9092,redacted:9092,redacted:9092
plugin.path=/plugins/kafka-connect-s3 key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
s3-sink.properties
name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=database_schema_topic1,database_schema_topic2,database_schema_topic3
s3.region=us-east-2 s3.bucket.name=databasekafka s3.part.size=5242880
flush.size=1 storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
连接器是否应该在看到主题的第一个 JSON 数据包时动态创建文件夹结构?
是的,即使您使用参数 "topics.dir" 和 "path.format"
控制此路径(目录结构)
除了配置 awscli 凭据,connect.properties 和 s3-sink.properties 是否需要设置任何其他设置才能正确连接到 S3 存储桶?
默认情况下,S3 连接器将通过环境变量或凭据文件使用 Aws 凭据(访问 ID 和密钥)。
可以通过修改参数"s3.credentials.provider.class"来改变。参数的默认值为 "DefaultAWSCredentialsProviderChain"
关于安装文档的建议比 Confluent 网站上的独立文档更全面? (以上链接)
我建议您使用分布式模式,因为它为您的连接集群和其上的连接器 运行 提供了高可用性。
您可以通过以下文档以分布式模式配置连接集群。
https://docs.confluent.io/current/connect/userguide.html#connect-userguide-dist-worker-config
我 manually installed Confluent Kafka Connect S3 使用独立方法而不是通过 Confluent 的过程或作为整个平台的一部分。
我可以使用以下命令从命令行成功启动连接器:
./kafka_2.11-2.1.0/bin/connect-standalone.sh connect.properties s3-sink.properties
可以看到来自 AWS MSK 的主题 CDC 偏移量正在被消耗。不会抛出任何错误。但是,在 AWS S3 中,没有为新数据创建文件夹结构,也没有存储 JSON 数据。
问题
- 连接器是否应该动态创建文件夹结构 看到主题的第一个 JSON 数据包?
- 除了配置 awscli 凭据,connect.properties 和 s3-sink.properties 是 还有任何其他设置需要设置才能正确连接到 S3桶?
- 关于安装文档的建议更多 比 Confluent 网站上的独立文档更全面? (以上链接)
connect.properties
bootstrap.servers=redacted:9092,redacted:9092,redacted:9092
plugin.path=/plugins/kafka-connect-s3 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets
s3-sink.properties
name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 topics=database_schema_topic1,database_schema_topic2,database_schema_topic3 s3.region=us-east-2 s3.bucket.name=databasekafka s3.part.size=5242880 flush.size=1 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.json.JsonFormat schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE
连接器是否应该在看到主题的第一个 JSON 数据包时动态创建文件夹结构? 是的,即使您使用参数 "topics.dir" 和 "path.format"
控制此路径(目录结构)除了配置 awscli 凭据,connect.properties 和 s3-sink.properties 是否需要设置任何其他设置才能正确连接到 S3 存储桶? 默认情况下,S3 连接器将通过环境变量或凭据文件使用 Aws 凭据(访问 ID 和密钥)。 可以通过修改参数"s3.credentials.provider.class"来改变。参数的默认值为 "DefaultAWSCredentialsProviderChain"
关于安装文档的建议比 Confluent 网站上的独立文档更全面? (以上链接) 我建议您使用分布式模式,因为它为您的连接集群和其上的连接器 运行 提供了高可用性。 您可以通过以下文档以分布式模式配置连接集群。 https://docs.confluent.io/current/connect/userguide.html#connect-userguide-dist-worker-config