Kafka 连接转换。解析输入字符串并获取记录键
Kafka connection transformations. Parse input string and get a record key
我使用简单的文件源reader
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
文件内容是每行中的一个简单 JSON 对象。我发现有一种方法可以替换记录键并使用转换来执行此操作,例如
# Add the `id` field as the key using Simple Message Transformations
transforms=InsertKey
# `ValueToKey`: push an object of one of the column fields (`id`) into the key
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=ip
但是我得到一个错误
Only Struct objects supported for [copying fields from value to key],
found: java.lang.String
有没有办法像我用 Flume 和 regex_extractor 那样解析字符串 json 并从那里获取密钥?
there is a way to replace a record key
有一个名为 org.apache.kafka.connect.transforms.ReplaceField$Key
的单独转换
InsertKey
将获取一个值并尝试插入 Struct/Map,但您似乎使用的是字符串键
在 SourceConnector 上使用转换时,转换是在 SourceConnector.poll()
返回的 List<SourceRecord>
上完成的
在您的例子中,FileStreamSourceConnector
读取文件的行并将每一行作为字符串放入 SourceRecord
对象中。因此,当转换得到SourceRecord
时,它只把它看成一个String,并不知道对象的结构。
为了解决这个问题,
- 要么修改
FileStreamSourceConnector
代码,使其 returns SourceRecord
具有有效的 Struct and Schema of your input json String. You can use the Kafka's SchemaBuilder class。
- 或者,如果您在接收器连接器中使用此数据,您可以通过在接收器连接器上设置以下配置,让 KafkaConnect 将其转换为 JSON,然后在接收器连接器上进行转换。
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable": "false"
如果您选择第二个选项,请不要忘记将这些配置放在您的 SourceConnector 上。
"value.convertor":"org.apache.kafka.connect.storage.StringConverter"
"value.converter.schemas.enable": "false"
我使用简单的文件源reader
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
文件内容是每行中的一个简单 JSON 对象。我发现有一种方法可以替换记录键并使用转换来执行此操作,例如
# Add the `id` field as the key using Simple Message Transformations
transforms=InsertKey
# `ValueToKey`: push an object of one of the column fields (`id`) into the key
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=ip
但是我得到一个错误
Only Struct objects supported for [copying fields from value to key], found: java.lang.String
有没有办法像我用 Flume 和 regex_extractor 那样解析字符串 json 并从那里获取密钥?
there is a way to replace a record key
有一个名为 org.apache.kafka.connect.transforms.ReplaceField$Key
InsertKey
将获取一个值并尝试插入 Struct/Map,但您似乎使用的是字符串键
在 SourceConnector 上使用转换时,转换是在 SourceConnector.poll()
List<SourceRecord>
上完成的
在您的例子中,FileStreamSourceConnector
读取文件的行并将每一行作为字符串放入 SourceRecord
对象中。因此,当转换得到SourceRecord
时,它只把它看成一个String,并不知道对象的结构。
为了解决这个问题,
- 要么修改
FileStreamSourceConnector
代码,使其 returnsSourceRecord
具有有效的 Struct and Schema of your input json String. You can use the Kafka's SchemaBuilder class。 - 或者,如果您在接收器连接器中使用此数据,您可以通过在接收器连接器上设置以下配置,让 KafkaConnect 将其转换为 JSON,然后在接收器连接器上进行转换。
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable": "false"
如果您选择第二个选项,请不要忘记将这些配置放在您的 SourceConnector 上。
"value.convertor":"org.apache.kafka.connect.storage.StringConverter"
"value.converter.schemas.enable": "false"