Kafka:将主题 A 复制到主题 B,同时对记录应用转换

Kafka: Replicate topic A to topic B while applying a transformation to the records

我需要将记录从集群 A 上的主题镜像到集群 B 上的主题同时在代理时向记录添加字段(例如 InsertField.

我不控制集群 A(但可能需要更改)并且完全控制集群 B。

我知道集群 A 正在发送序列化 JSON。

我正在使用 MirrorMaker API 和 Kafka 连接来进行镜像,我正在尝试使用 InsertField 转换在代理时在记录上添加数据。

我的配置是这样的:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092

# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter


transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname

此代码将失败并显示错误:

org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field insertion]

有没有一种方法可以在不编写自定义转换的情况下实现这一点(我正在使用 Python 并且我不熟悉 Java)

非常感谢

如评论所述,字节数组转换器没有 Struct/Schema 信息,因此无法使用您正在使用的转换(添加字段)。

这并不意味着不能使用转换,但是


如果您要发送 JSON 消息,则必须发送架构和负载信息。

Apache Kafka (2.6.0) 的当前版本中,您 无法将 InsertField 单个消息转换 (SMT) 应用到 MirrorMaker 2.0 记录。

说明

MirrorMaker 2.0 is based on Kafka Connect framework and, internally, the MirrorMaker 2.0 driver sets up MirrorSourceConnector.

源连接器apply SMT immediately after polling records (there are no converters (e.g. ByteArrayConverter or JsonConverter) at this steps: they are used after 已应用 SMT)。

具有架构的记录的 SourceRecord value are represented as a byte array with BYTES_SCHEMA schema. At the same time InsertField transformation requires Type.STRUCT

因此,由于无法确定记录为 Struct,因此未应用转换。

参考资料

  1. KIP-382: MirrorMaker 2.0
  2. How to Use Single Message Transforms in Kafka Connect

其他资源

  1. Docker-compose playground for MirrorMaker 2.0