Kafka 连接转换。解析输入字符串并获取记录键

Kafka connection transformations. Parse input string and get a record key

我使用简单的文件源reader

connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1

文件内容是每行中的一个简单 JSON 对象。我发现有一种方法可以替换记录键并使用转换来执行此操作,例如

# Add the `id` field as the key using Simple Message Transformations
transforms=InsertKey

# `ValueToKey`: push an object of one of the column fields (`id`) into the key
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=ip

但是我得到一个错误

Only Struct objects supported for [copying fields from value to key], found: java.lang.String

有没有办法像我用 Flume 和 regex_extractor 那样解析字符串 json 并从那里获取密钥?

there is a way to replace a record key

有一个名为 org.apache.kafka.connect.transforms.ReplaceField$Key

的单独转换

InsertKey 将获取一个值并尝试插入 Struct/Map,但您似乎使用的是字符串键

在 SourceConnector 上使用转换时,转换是在 SourceConnector.poll()

返回的 List<SourceRecord> 上完成的

在您的例子中,FileStreamSourceConnector 读取文件的行并将每一行作为字符串放入 SourceRecord 对象中。因此,当转换得到SourceRecord时,它只把它看成一个String,并不知道对象的结构。

为了解决这个问题,

  1. 要么修改 FileStreamSourceConnector 代码,使其 returns SourceRecord 具有有效的 Struct and Schema of your input json String. You can use the Kafka's SchemaBuilder class。
  2. 或者,如果您在接收器连接器中使用此数据,您可以通过在接收器连接器上设置以下配置,让 KafkaConnect 将其转换为 JSON,然后在接收器连接器上进行转换。

"value.converter":"org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable": "false"

如果您选择第二个选项,请不要忘记将这些配置放在您的 SourceConnector 上。

"value.convertor":"org.apache.kafka.connect.storage.StringConverter"
"value.converter.schemas.enable": "false"