AWS DMS CDC - 只捕获更改的值而不是整个记录? (来源 RDS MySQL)

AWS DMS CDC - Only capture changed values not entire record? (Source RDS MySQL)

我有一个 DMS CDC 任务集(更改数据捕获)从 MySQL 数据库流式传输到 Lambda 连接到的 Kinesis 流。

我希望最终只收到已更改的值,而不是整个行的转储,这样我就知道正在更改的列(目前,如果不设置另一个系统来解密它是不可能的我自己跟踪更改)。

示例,具有以下映射规则:

     {
        "rule-type": "selection",
        "rule-id": "1",
        "rule-name": "1",
        "object-locator": {
            "schema-name": "my-schema",
            "table-name": "product"
        },
        "rule-action": "include",
        "filters": []
    },

如果我更改了产品 table 上记录的名称 属性,我希望收到这样的记录:

{
    "data": {
        "name": "newValue"
    },
    "metadata": {
        "timestamp": "2021-07-26T06:47:15.762584Z",
        "record-type": "data",
        "operation": "update",
        "partition-key-type": "schema-table",
        "schema-name": "my-schema",
        "table-name": "product",
        "transaction-id": 8633730840
    }
}

然而我实际收到的是这样的:

{
    "data": {
        "name": "newValue",
        "id": "unchangedId",
        "quantity": "unchangedQuantity",
        "otherProperty": "unchangedValue"
    },
    "metadata": {
        "timestamp": "2021-07-26T06:47:15.762584Z",
        "record-type": "data",
        "operation": "update",
        "partition-key-type": "schema-table",
        "schema-name": "my-schema",
        "table-name": "product",
        "transaction-id": 8633730840
    }
}

正如您在收到此消息时所看到的,如果不设置额外的系统来跟踪它,就不可能破译 属性 发生了什么变化。

我发现另一个 有人发布了一个问题,因为他们的 CDC 正在做 我希望我做的事情。谁能指出我实现这一目标的正确方向?

我在深入研究 AWS 文档后找到了答案。

https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.BeforeImage

Different source database engines provide different amounts of information for a before image:

  • Oracle provides updates to columns only if they change.

  • PostgreSQL provides only data for columns that are part of the primary key (changed or not).

  • MySQL generally provides data for all columns (changed or not).

我在任务设置中使用 BeforeImageSettings 来包含带有有效负载的原始数据。

"BeforeImageSettings": {
    "EnableBeforeImage": true,
    "FieldName": "before-image",
    "ColumnFilter": "all"
}

虽然这仍然为我提供了完整的记录,但它为我提供了足够的数据来计算出在没有其他系统的情况下发生了什么变化。

{
    "data": {
        "name": "newValue",
        "id": "unchangedId",
        "quantity": "unchangedQuantity",
        "otherProperty": "unchangedValue"
    },
    "before-image": {
        "name": "oldValue",
        "id": "unchangedId",
        "quantity": "unchangedQuantity",
        "otherProperty": "unchangedValue"
    },
    "metadata": {
        "timestamp": "2021-07-26T06:47:15.762584Z",
        "record-type": "data",
        "operation": "update",
        "partition-key-type": "schema-table",
        "schema-name": "my-schema",
        "table-name": "product",
        "transaction-id": 8633730840
    }
}