Debezium 发件箱模式 属性 transforms.outbox.table.expand.json.payload 不工作

Debezium Outbox Pattern property transforms.outbox.table.expand.json.payload not working

我正在使用 debezium postgres 连接器实现发件箱模式,建立在官方文档的基础上:https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html

一切正常 - 除了 属性“transforms.outbox.table.expand.json.payload: true”不工作。

使用以下数据库记录(SQL 插入):

INSERT INTO public.outbox_event_entity (id, event_id, "key", payload, topic, "type") VALUES(0, 'e09d6355-8e7c-4055-936c-4f997423925e', '1', '{"key":"value"}'::jsonb, 'topic', 'NEW_EVENT');

生成的记录的有效负载包含转义字符串 json 而不是真正的 json 字段:

"{\"key\": \"value\"}"

我正在使用这个配置:

spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  config:
    value.converter: org.apache.kafka.connect.json.JsonConverter
    table.include.list: public.outbox_event_entity
    transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
    publication.autocreate.mode: FILTERED
    plugin.name: pgoutput
    transforms: outbox
    transforms.outbox.table.fields.additional.placement: 'type:header,event_id:header,timestamp_created:header'
    value.converter.schemas.enable: false
    transforms.outbox.table.field.event.key: id
    topic: topic
    key.converter: org.apache.kafka.connect.json.JsonConverter
    transforms.outbox.route.by.field: topic
    transforms.outbox.table.expand.json.payload: true
    connector.class: io.debezium.connector.postgresql.PostgresConnector
    include.schema.changes: false
    file: /opt/kafka/LICENSE
    key.converter.schemas.enable: false
    [database properties omitted]

有人能指出我的错误吗?

此致 安迪

我 运行 在这里遇到了同样的问题,并找到了使用不同值转换器的解决方案。例如,我之前输出到 kafka 的内容是这样的:

"{\"id\": 3}"

但是在更新到连接器配置后:

"value.converter": "org.apache.kafka.connect.storage.StringConverter"

我对相同负载的输出如下所示:

{"id": 3}