使用 Azure 数据工厂向 Cosmos DB Json 数据添加新列

Add new column to Cosmos DB Json data using azure data factory

我们需要向 Cosmos DB 中的 Json 数据添加几列,但要处理复杂的数据类型(地址)。有人可以指导我如何添加新列和转换数据,因为我们有大量数据(数百万条记录)要转换。

这是示例JSON数据请求

{
    "id": "108-406-004",
    "Title": "Mr",
    "FirstName": "John",
    "LastName": "Smith",
    "Addresses": [
        {
        "Line1": "5 London Road",
        "Country": "UNITED KINGDOM"
        }
    ]
}

我需要向 Address 数组添加新列(“Line1LowerCase”)并将值设置为 lower(Line1),因此输出应如下所示:

{
    "id": "108-406-004",
    "Title": "Mr",
    "FirstName": "John",
    "LastName": "Smith",
    "Addresses": [
        {
        "Line1": "5 London Road",
        "Country": "UNITED KINGDOM",
        **"Line1LowerCase": "5 london road"**
        }
    ]
}

Here is what I tried so far, but the Address array is coming like this 
"Addresses": {
        "Line1": [
            "5 London Road"
        ],
        "Country": [
            "UNITED KINGDOM"
        ],
        "Addressline1Internal": "[\"5 london road\"]",
    }

使用数据工厂从 Cosmos db 数据集导入数据,添加派生列步骤以转换 json 数据并将结果汇​​入 Cosmos DB 数据集。

如果您正在尝试更新 ID“108-406-004”以添加新的 属性,您可以使用副本 activity,并将“写入行为”选项作为“Upsert” ”。使用此选项会自动将新 属性 添加到 Cosmos DB 中的记录。

我是这样测试你的案例的:

  1. 我的 cosmos DB 已经记录了您的第一个 json 示例(没有 属性“Line1LowerCase”。

  1. 新副本 activity 使用选项“Upsert”创建并且副本中没有任何映射 activity。

  1. 在 运行 副本 activity 使用带有新 属性 的 sson 后,它会自动将这个新 属性 添加到 cosmos:

此外,源和汇数据集中没有映射:

源代码:

{
    "name": "pipeline3",
    "properties": {
        "activities": [
            {
                "name": "Copy data1",
                "type": "Copy",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "JsonSource",
                        "storeSettings": {
                            "type": "AzureBlobStorageReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "JsonReadSettings"
                        }
                    },
                    "sink": {
                        "type": "CosmosDbSqlApiSink",
                        "writeBehavior": "upsert",
                        "disableMetricsCollection": false
                    },
                    "enableStaging": false
                },
                "inputs": [
                    {
                        "referenceName": "LandingJson",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "CosmosDbSqlApiCollection1",
                        "type": "DatasetReference"
                    }
                ]
            }
        ],
        "annotations": []
    }
}

如何将值转换为小写

数据工厂不允许您在副本 activity 中转换 属性。 为此,您可以使用数据流,然后从您的管道中调用此数据流。

这是一个数据流示例,用于替换该简单副本 activity:

来源自Json

Select拉取你需要的属性

要转换为小写的派生列

改变步骤以允许 upsert

沉入 Cosmos DB

源代码:

{
    "name": "dataflow3",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "LandingJson",
                        "type": "DatasetReference"
                    },
                    "name": "SourceJson"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "CosmosDbSqlApiCollection1",
                        "type": "DatasetReference"
                    },
                    "name": "SinkCosmos"
                }
            ],
            "transformations": [
                {
                    "name": "Select"
                },
                {
                    "name": "DerivedColumn"
                },
                {
                    "name": "AlterRow"
                }
            ],
            "script": "source(output(\n\t\tid as string,\n\t\tTitle as string,\n\t\tFirstName as string,\n\t\tLastName as string,\n\t\tAddresses as (Line1 as string, Country as string)[]\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tignoreNoFilesFound: false,\n\tdocumentForm: 'arrayOfDocuments') ~> SourceJson\nSourceJson select(mapColumn(\n\t\tid,\n\t\tTitle,\n\t\tFirstName,\n\t\tLastName,\n\t\tLine1 = Addresses[1].Line1\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select\nSelect derive(Line1LowerCase = lower(Line1)) ~> DerivedColumn\nDerivedColumn alterRow(upsertIf(true())) ~> AlterRow\nAlterRow sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tdeletable:false,\n\tinsertable:false,\n\tupdateable:false,\n\tupsertable:true,\n\tformat: 'document',\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> SinkCosmos"
        }
    }
}