Kafka Sink 连接器是否可以将记录时间戳作为存储在存储中的有效载荷包含在内
Can Kafka Sink Connectors include the record timestamp as the payload stored at the storage
我同时使用 S3 和 JDBC 接收器连接器,我在存储数据时遇到了一些奇怪的行为。对于一些协调,我真的很想将 Kafka 摄取时间或记录生成时间保存到存储在 Sink 系统中的数据中。
我正在查看文档,但没有找到。
我正在使用 Confluent 连接器,但如果它允许我这样做,我也可以使用其他连接器,如 Camel。
有人可以给我一些建议吗?
更新:
基于 onecricketeer 的良好反馈,我明白我应该看看这个:
https://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield
而且我还看到了这个例子:
我会测试它,但我是否理解正确,例如,理论上我可以这样做:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"
这会在名为 recordOffset、recordPartition 和 recordTimestamp 的记录中创建 3 个新属性,其中包含描述的值。
如果我想确保这些值始终存在或失败,我需要做的(不确定我是否理解后缀部分):
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"
正如 @OneCricketeer
所说,InsertField
单一消息转换在这里完成了工作。这是使用它的 S3 接收器配置示例:
{
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"s3.region" : "us-west-2",
"s3.bucket.name" : "rmoff-smt-demo-01",
"topics" : "customers,transactions",
"tasks.max" : "4",
"flush.size" : "16",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility" : "NONE",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms" : "insertTS,formatTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field" : "messageTS",
"transforms.formatTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTS.format" : "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTS.field" : "messageTS",
"transforms.formatTS.target.type" : "string"
}
请注意,它还使用 TimestampConverter
来格式化字符串中的时间戳 - 默认情况下它是一个纪元。
你的问题促使我write this up properly and record a little tutorial - you can see it here: https://youtu.be/3Gj_SoyuTYk
我同时使用 S3 和 JDBC 接收器连接器,我在存储数据时遇到了一些奇怪的行为。对于一些协调,我真的很想将 Kafka 摄取时间或记录生成时间保存到存储在 Sink 系统中的数据中。
我正在查看文档,但没有找到。 我正在使用 Confluent 连接器,但如果它允许我这样做,我也可以使用其他连接器,如 Camel。
有人可以给我一些建议吗?
更新: 基于 onecricketeer 的良好反馈,我明白我应该看看这个: https://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield
而且我还看到了这个例子:
我会测试它,但我是否理解正确,例如,理论上我可以这样做:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"
这会在名为 recordOffset、recordPartition 和 recordTimestamp 的记录中创建 3 个新属性,其中包含描述的值。
如果我想确保这些值始终存在或失败,我需要做的(不确定我是否理解后缀部分):
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"
正如 @OneCricketeer
所说,InsertField
单一消息转换在这里完成了工作。这是使用它的 S3 接收器配置示例:
{
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"s3.region" : "us-west-2",
"s3.bucket.name" : "rmoff-smt-demo-01",
"topics" : "customers,transactions",
"tasks.max" : "4",
"flush.size" : "16",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility" : "NONE",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms" : "insertTS,formatTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field" : "messageTS",
"transforms.formatTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTS.format" : "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTS.field" : "messageTS",
"transforms.formatTS.target.type" : "string"
}
请注意,它还使用 TimestampConverter
来格式化字符串中的时间戳 - 默认情况下它是一个纪元。
你的问题促使我write this up properly and record a little tutorial - you can see it here: https://youtu.be/3Gj_SoyuTYk