正确配置 Kafka Connect S3 Sink TimeBasedPartitioner
Properly Configuring Kafka Connect S3 Sink TimeBasedPartitioner
我正在尝试使用 Confluent S3 接收器的 TimeBasedPartitioner。这是我的配置:
{
"name":"s3-sink",
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"file":"test.sink.txt",
"topics":"xxxxx",
"s3.region":"yyyyyy",
"s3.bucket.name":"zzzzzzz",
"s3.part.size":"5242880",
"flush.size":"1000",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"Record",
"timestamp.field":"local_timestamp",
"path.format":"YYYY-MM-dd-HH",
"partition.duration.ms":"3600000",
"schema.compatibility":"NONE"
}
}
数据是二进制的,我为它使用了 avro 方案。我想使用实际的记录字段 "local_timestamp" 这是一个 UNIX 时间戳来划分数据,比如每小时文件。
我使用通常的 REST API 调用启动连接器
curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors
很遗憾,数据没有按照我的意愿进行分区。我还尝试删除冲洗尺寸,因为这可能会干扰。但是后来我得到了错误
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nMissing required configuration \"flush.size\" which has no default value.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%
知道如何正确设置 TimeBasedPartioner 吗?我找不到一个有效的例子。
另外,如何调试此类问题或进一步了解连接器实际在做什么?
非常感谢任何帮助或进一步的建议。
研究 TimeBasedPartitioner.java 的代码和
的日志后
confluent log connect tail -f
我意识到时区和区域设置都是强制性的,尽管 Confluent S3 Connector 文档中并未如此指定。以下配置字段解决了问题,让我将正确分区的记录上传到 S3 存储桶:
"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",
注意两件事:首先 flush.size 的值也是必需的,文件最终会被分成更小的块,不大于 flush.size 指定的值。其次,如上所示,最好选择 path.format,以便生成正确的树结构。
我仍然不能 100% 确定记录字段 local_timestamp 是否真的用于对记录进行分区。
非常欢迎任何意见或改进。
确实,您修改后的配置似乎是正确的。
具体来说,将 timestamp.extractor
设置为 RecordField
允许您根据记录具有的时间戳字段对文件进行分区,并且您通过设置 属性 timestamp.field
.
当改为设置 timestamp.extractor=Record
时,基于时间的分区程序将为每条记录使用 Kafka 时间戳。
关于 flush.size
,将此 属性 设置为高值(例如 Integer.MAX_VALUE
)实际上等同于忽略它。
最后,最新版本的连接器不再需要 schema.generator.class
。
我正在尝试使用 Confluent S3 接收器的 TimeBasedPartitioner。这是我的配置:
{
"name":"s3-sink",
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"file":"test.sink.txt",
"topics":"xxxxx",
"s3.region":"yyyyyy",
"s3.bucket.name":"zzzzzzz",
"s3.part.size":"5242880",
"flush.size":"1000",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"Record",
"timestamp.field":"local_timestamp",
"path.format":"YYYY-MM-dd-HH",
"partition.duration.ms":"3600000",
"schema.compatibility":"NONE"
}
}
数据是二进制的,我为它使用了 avro 方案。我想使用实际的记录字段 "local_timestamp" 这是一个 UNIX 时间戳来划分数据,比如每小时文件。
我使用通常的 REST API 调用启动连接器
curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors
很遗憾,数据没有按照我的意愿进行分区。我还尝试删除冲洗尺寸,因为这可能会干扰。但是后来我得到了错误
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nMissing required configuration \"flush.size\" which has no default value.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%
知道如何正确设置 TimeBasedPartioner 吗?我找不到一个有效的例子。
另外,如何调试此类问题或进一步了解连接器实际在做什么?
非常感谢任何帮助或进一步的建议。
研究 TimeBasedPartitioner.java 的代码和
的日志后confluent log connect tail -f
我意识到时区和区域设置都是强制性的,尽管 Confluent S3 Connector 文档中并未如此指定。以下配置字段解决了问题,让我将正确分区的记录上传到 S3 存储桶:
"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",
注意两件事:首先 flush.size 的值也是必需的,文件最终会被分成更小的块,不大于 flush.size 指定的值。其次,如上所示,最好选择 path.format,以便生成正确的树结构。
我仍然不能 100% 确定记录字段 local_timestamp 是否真的用于对记录进行分区。
非常欢迎任何意见或改进。
确实,您修改后的配置似乎是正确的。
具体来说,将 timestamp.extractor
设置为 RecordField
允许您根据记录具有的时间戳字段对文件进行分区,并且您通过设置 属性 timestamp.field
.
当改为设置 timestamp.extractor=Record
时,基于时间的分区程序将为每条记录使用 Kafka 时间戳。
关于 flush.size
,将此 属性 设置为高值(例如 Integer.MAX_VALUE
)实际上等同于忽略它。
最后,最新版本的连接器不再需要 schema.generator.class
。