具有自定义规则的 Amazon DMS 任务在下沉到 Kinesis 时失败

Amazon DMS task with custom rules fails when sinking to Kinesis

我正在尝试使用 Amazon DMS 监听 Aurora 数据库更改并将更改推送到 Kinesis 流,其中监听流的 Lambda 函数将进行处理。

我参考了以下文档来编写我的规则。

https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html https://aws.amazon.com/blogs/database/use-the-aws-database-migration-service-to-stream-change-data-to-amazon-kinesis-data-streams/

这是我的 DMS 持续复制 (CDC) 任务的规则映射。

{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "my_db",
                "table-name": "my_table"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "2",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "my_db",
                "table-name": "my_table"
            },
            "mapping-parameters": {
                "partition-key": {
                    "attribute-name": "my_id",
                    "value": "${my_id}"
                }
            }
        }
    ]
}

但是,当我更改源 table 时,DMS 任务失败并出现以下错误。

2019-02-05T10:36:55 [TARGET_APPLY ]E: Error allocating memory for Json document [1020100] (field_mapping_utils.c:382)
2019-02-05T10:36:55 [TARGET_APPLY ]E: Failed while looking for object mapping for table my_table [1020100] (kinesis_utils.c:258)
2019-02-05T10:36:55 [TARGET_APPLY ]E: Error executing data handler [1020100] (streamcomponent.c:1778)
2019-02-05T10:36:55 [TASK_MANAGER ]E: Stream component failed at subtask 0, component st_0_some_random_id [1020100] (subtask.c:1366)
2019-02-05T10:36:55 [TASK_MANAGER ]E: Task error notification received from subtask 0, thread 1 [1020100] (replicationtask.c:2661)
2019-02-05T10:36:55 [TASK_MANAGER ]W: Task 'some_random_task_id' encountered a fatal error (repository.c:4704)

当我在没有 object-mapping 规则的情况下尝试时,Kinesis 将获得一条包含 "partitionKey": "my_db.my_table" 且具有正确值的记录,这是 table-to-[ 的默认行为=53=] 下沉,但我们需要 table-to-kinesis 下沉。

为什么我这么关心partition-key?因为我需要利用我的 Kinesis 流中的所有分片。

有人可以帮助我吗?

更新:

当我将"partition-key-type": "schema-table"添加到"mapping-parameters"时,它不会失败,任务不会失败,但会忽略"partition-key"属性并且会有"partitionKey": "my_db.my_table" 和以前一样。

不确定点:

  1. 在table-to-table下沉中,它使用"partition-key-type": "schema-table",但从未提及table-to-kinesis的价值。
  2. 文档中的示例和解释非常有限,甚至有错误(即某些规则 JSON 无效)

所以,我在这里回答我自己的问题。

我们联系了 AWS 支持团队,他们说这是他们方面的问题,而且文档也没有反映确切的功能。他们也在内部提出了一个问题,并在未来解决它。

目前,由于 DMS 无法满足我们的期望,我们决定转向其他解决方案。