Kafka:将主题 A 复制到主题 B,同时对记录应用转换
Kafka: Replicate topic A to topic B while applying a transformation to the records
我需要将记录从集群 A 上的主题镜像到集群 B 上的主题同时在代理时向记录添加字段(例如 InsertField
).
我不控制集群 A(但可能需要更改)并且完全控制集群 B。
我知道集群 A 正在发送序列化 JSON。
我正在使用 MirrorMaker API 和 Kafka 连接来进行镜像,我正在尝试使用 InsertField
转换在代理时在记录上添加数据。
我的配置是这样的:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092
# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname
此代码将失败并显示错误:
org.apache.kafka.connect.errors.DataException: Only Struct objects
supported for [field insertion]
有没有一种方法可以在不编写自定义转换的情况下实现这一点(我正在使用 Python 并且我不熟悉 Java)
非常感谢
如评论所述,字节数组转换器没有 Struct/Schema 信息,因此无法使用您正在使用的转换(添加字段)。
这并不意味着不能使用转换,但是
如果您要发送 JSON 消息,则必须发送架构和负载信息。
在 Apache Kafka
(2.6.0) 的当前版本中,您 无法将 InsertField 单个消息转换 (SMT) 应用到 MirrorMaker 2.0
记录。
说明
MirrorMaker 2.0
is based on Kafka Connect
framework and, internally, the MirrorMaker 2.0 driver sets up MirrorSourceConnector
.
源连接器apply SMT immediately after polling records (there are no converters (e.g. ByteArrayConverter
or JsonConverter
) at this steps: they are used after 已应用 SMT)。
具有架构的记录的 SourceRecord
value are represented as a byte array with BYTES_SCHEMA
schema. At the same time InsertField
transformation requires Type.STRUCT
。
因此,由于无法确定记录为 Struct
,因此未应用转换。
参考资料
其他资源
我需要将记录从集群 A 上的主题镜像到集群 B 上的主题同时在代理时向记录添加字段(例如 InsertField
).
我不控制集群 A(但可能需要更改)并且完全控制集群 B。
我知道集群 A 正在发送序列化 JSON。
我正在使用 MirrorMaker API 和 Kafka 连接来进行镜像,我正在尝试使用 InsertField
转换在代理时在记录上添加数据。
我的配置是这样的:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092
# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname
此代码将失败并显示错误:
org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field insertion]
有没有一种方法可以在不编写自定义转换的情况下实现这一点(我正在使用 Python 并且我不熟悉 Java)
非常感谢
如评论所述,字节数组转换器没有 Struct/Schema 信息,因此无法使用您正在使用的转换(添加字段)。
这并不意味着不能使用转换,但是
如果您要发送 JSON 消息,则必须发送架构和负载信息。
在 Apache Kafka
(2.6.0) 的当前版本中,您 无法将 InsertField 单个消息转换 (SMT) 应用到 MirrorMaker 2.0
记录。
说明
MirrorMaker 2.0
is based on Kafka Connect
framework and, internally, the MirrorMaker 2.0 driver sets up MirrorSourceConnector
.
源连接器apply SMT immediately after polling records (there are no converters (e.g. ByteArrayConverter
or JsonConverter
) at this steps: they are used after 已应用 SMT)。
具有架构的记录的 SourceRecord
value are represented as a byte array with BYTES_SCHEMA
schema. At the same time InsertField
transformation requires Type.STRUCT
。
因此,由于无法确定记录为 Struct
,因此未应用转换。
参考资料
其他资源