使用 Azure 数据工厂向 Cosmos DB Json 数据添加新列
Add new column to Cosmos DB Json data using azure data factory
我们需要向 Cosmos DB 中的 Json 数据添加几列,但要处理复杂的数据类型(地址)。有人可以指导我如何添加新列和转换数据,因为我们有大量数据(数百万条记录)要转换。
这是示例JSON数据请求
{
"id": "108-406-004",
"Title": "Mr",
"FirstName": "John",
"LastName": "Smith",
"Addresses": [
{
"Line1": "5 London Road",
"Country": "UNITED KINGDOM"
}
]
}
我需要向 Address 数组添加新列(“Line1LowerCase”)并将值设置为 lower(Line1),因此输出应如下所示:
{
"id": "108-406-004",
"Title": "Mr",
"FirstName": "John",
"LastName": "Smith",
"Addresses": [
{
"Line1": "5 London Road",
"Country": "UNITED KINGDOM",
**"Line1LowerCase": "5 london road"**
}
]
}
Here is what I tried so far, but the Address array is coming like this
"Addresses": {
"Line1": [
"5 London Road"
],
"Country": [
"UNITED KINGDOM"
],
"Addressline1Internal": "[\"5 london road\"]",
}
使用数据工厂从 Cosmos db 数据集导入数据,添加派生列步骤以转换 json 数据并将结果汇入 Cosmos DB 数据集。
如果您正在尝试更新 ID“108-406-004”以添加新的 属性,您可以使用副本 activity,并将“写入行为”选项作为“Upsert” ”。使用此选项会自动将新 属性 添加到 Cosmos DB 中的记录。
我是这样测试你的案例的:
- 我的 cosmos DB 已经记录了您的第一个 json 示例(没有 属性“Line1LowerCase”。
- 新副本 activity 使用选项“Upsert”创建并且副本中没有任何映射 activity。
- 在 运行 副本 activity 使用带有新 属性 的 sson 后,它会自动将这个新 属性 添加到 cosmos:
此外,源和汇数据集中没有映射:
源代码:
{
"name": "pipeline3",
"properties": {
"activities": [
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "JsonSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "JsonReadSettings"
}
},
"sink": {
"type": "CosmosDbSqlApiSink",
"writeBehavior": "upsert",
"disableMetricsCollection": false
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "LandingJson",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "CosmosDbSqlApiCollection1",
"type": "DatasetReference"
}
]
}
],
"annotations": []
}
}
如何将值转换为小写
数据工厂不允许您在副本 activity 中转换 属性。
为此,您可以使用数据流,然后从您的管道中调用此数据流。
这是一个数据流示例,用于替换该简单副本 activity:
来源自Json
Select拉取你需要的属性
要转换为小写的派生列
改变步骤以允许 upsert
沉入 Cosmos DB
源代码:
{
"name": "dataflow3",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "LandingJson",
"type": "DatasetReference"
},
"name": "SourceJson"
}
],
"sinks": [
{
"dataset": {
"referenceName": "CosmosDbSqlApiCollection1",
"type": "DatasetReference"
},
"name": "SinkCosmos"
}
],
"transformations": [
{
"name": "Select"
},
{
"name": "DerivedColumn"
},
{
"name": "AlterRow"
}
],
"script": "source(output(\n\t\tid as string,\n\t\tTitle as string,\n\t\tFirstName as string,\n\t\tLastName as string,\n\t\tAddresses as (Line1 as string, Country as string)[]\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tignoreNoFilesFound: false,\n\tdocumentForm: 'arrayOfDocuments') ~> SourceJson\nSourceJson select(mapColumn(\n\t\tid,\n\t\tTitle,\n\t\tFirstName,\n\t\tLastName,\n\t\tLine1 = Addresses[1].Line1\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select\nSelect derive(Line1LowerCase = lower(Line1)) ~> DerivedColumn\nDerivedColumn alterRow(upsertIf(true())) ~> AlterRow\nAlterRow sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tdeletable:false,\n\tinsertable:false,\n\tupdateable:false,\n\tupsertable:true,\n\tformat: 'document',\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> SinkCosmos"
}
}
}
我们需要向 Cosmos DB 中的 Json 数据添加几列,但要处理复杂的数据类型(地址)。有人可以指导我如何添加新列和转换数据,因为我们有大量数据(数百万条记录)要转换。
这是示例JSON数据请求
{
"id": "108-406-004",
"Title": "Mr",
"FirstName": "John",
"LastName": "Smith",
"Addresses": [
{
"Line1": "5 London Road",
"Country": "UNITED KINGDOM"
}
]
}
我需要向 Address 数组添加新列(“Line1LowerCase”)并将值设置为 lower(Line1),因此输出应如下所示:
{
"id": "108-406-004",
"Title": "Mr",
"FirstName": "John",
"LastName": "Smith",
"Addresses": [
{
"Line1": "5 London Road",
"Country": "UNITED KINGDOM",
**"Line1LowerCase": "5 london road"**
}
]
}
Here is what I tried so far, but the Address array is coming like this
"Addresses": {
"Line1": [
"5 London Road"
],
"Country": [
"UNITED KINGDOM"
],
"Addressline1Internal": "[\"5 london road\"]",
}
使用数据工厂从 Cosmos db 数据集导入数据,添加派生列步骤以转换 json 数据并将结果汇入 Cosmos DB 数据集。
如果您正在尝试更新 ID“108-406-004”以添加新的 属性,您可以使用副本 activity,并将“写入行为”选项作为“Upsert” ”。使用此选项会自动将新 属性 添加到 Cosmos DB 中的记录。
我是这样测试你的案例的:
- 我的 cosmos DB 已经记录了您的第一个 json 示例(没有 属性“Line1LowerCase”。
- 新副本 activity 使用选项“Upsert”创建并且副本中没有任何映射 activity。
- 在 运行 副本 activity 使用带有新 属性 的 sson 后,它会自动将这个新 属性 添加到 cosmos:
此外,源和汇数据集中没有映射:
源代码:
{
"name": "pipeline3",
"properties": {
"activities": [
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "JsonSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "JsonReadSettings"
}
},
"sink": {
"type": "CosmosDbSqlApiSink",
"writeBehavior": "upsert",
"disableMetricsCollection": false
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "LandingJson",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "CosmosDbSqlApiCollection1",
"type": "DatasetReference"
}
]
}
],
"annotations": []
}
}
如何将值转换为小写
数据工厂不允许您在副本 activity 中转换 属性。 为此,您可以使用数据流,然后从您的管道中调用此数据流。
这是一个数据流示例,用于替换该简单副本 activity:
来源自Json
Select拉取你需要的属性
要转换为小写的派生列
改变步骤以允许 upsert
沉入 Cosmos DB
源代码:
{
"name": "dataflow3",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "LandingJson",
"type": "DatasetReference"
},
"name": "SourceJson"
}
],
"sinks": [
{
"dataset": {
"referenceName": "CosmosDbSqlApiCollection1",
"type": "DatasetReference"
},
"name": "SinkCosmos"
}
],
"transformations": [
{
"name": "Select"
},
{
"name": "DerivedColumn"
},
{
"name": "AlterRow"
}
],
"script": "source(output(\n\t\tid as string,\n\t\tTitle as string,\n\t\tFirstName as string,\n\t\tLastName as string,\n\t\tAddresses as (Line1 as string, Country as string)[]\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tignoreNoFilesFound: false,\n\tdocumentForm: 'arrayOfDocuments') ~> SourceJson\nSourceJson select(mapColumn(\n\t\tid,\n\t\tTitle,\n\t\tFirstName,\n\t\tLastName,\n\t\tLine1 = Addresses[1].Line1\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select\nSelect derive(Line1LowerCase = lower(Line1)) ~> DerivedColumn\nDerivedColumn alterRow(upsertIf(true())) ~> AlterRow\nAlterRow sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tdeletable:false,\n\tinsertable:false,\n\tupdateable:false,\n\tupsertable:true,\n\tformat: 'document',\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> SinkCosmos"
}
}
}