如何使用 Azure 数据工厂将 CosmosDb 文档复制到 Blob 存储(每个文档在单个 json 文件中)
How to copy CosmosDb docs to Blob storage (each doc in single json file) with Azure Data Factory
我正在尝试使用 Azure 数据工厂 (v2) 备份我的 Cosmos Db 存储。一般来说,它正在做它的工作,但我想让 Cosmos 集合中的每个文档都对应 blob 存储中的新 json 文件。
使用下一个复制参数,我可以将集合中的所有文档复制到 Azure Blob 存储中的 1 个文件中:
{
"name": "ForEach_mih",
"type": "ForEach",
"typeProperties": {
"items": {
"value": "@pipeline().parameters.cw_items",
"type": "Expression"
},
"activities": [
{
"name": "Copy_mih",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"userProperties": [
{
"name": "Source",
"value": "@{item().source.collectionName}"
},
{
"name": "Destination",
"value": "cosmos-backup-v2/@{item().destination.fileName}"
}
],
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"enableSkipIncompatibleRow": true,
"redirectIncompatibleRowSettings": {
"linkedServiceName": {
"referenceName": "Clear_Test_BlobStorage",
"type": "LinkedServiceReference"
},
"path": "cosmos-backup-logs"
},
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "SourceDataset_mih",
"type": "DatasetReference",
"parameters": {
"cw_collectionName": "@item().source.collectionName"
}
}
],
"outputs": [
{
"referenceName": "DestinationDataset_mih",
"type": "DatasetReference",
"parameters": {
"cw_fileName": "@item().destination.fileName"
}
}
]
}
]
}
}
如何将每个 cosmos 文档复制到单独的文件并将其命名为 {PartitionId}-{docId}?
UPD
源集代码:
{
"name": "ClustersData",
"properties": {
"linkedServiceName": {
"referenceName": "Clear_Test_CosmosDb",
"type": "LinkedServiceReference"
},
"type": "DocumentDbCollection",
"typeProperties": {
"collectionName": "directory-clusters"
}
}
}
目标集代码:
{
"name": "OutputClusters",
"properties": {
"linkedServiceName": {
"referenceName": "Clear_Test_BlobStorage",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": "",
"folderPath": "cosmos-backup-logs"
}
}
}
流水线代码:
{
"name": "copy-clsts",
"properties": {
"activities": [
{
"name": "LookupClst",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"nestingSeparator": "."
},
"dataset": {
"referenceName": "ClustersData",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachClst",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupClst",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupClst').output.value",
"type": "Expression"
},
"batchCount": 8,
"activities": [
{
"name": "CpyClst",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": "select @{item()}",
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"enableSkipIncompatibleRow": true,
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "ClustersData",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "OutputClusters",
"type": "DatasetReference"
}
]
}
]
}
}
]
}
}
输入集合中的文档示例(所有格式相同):
{
"$type": "Entities.ADCluster",
"DisplayName": "TESTNetBIOS",
"OrgId": "9b679d2a-42c5-4c9a-a2e2-3ce63c1c3506",
"ClusterId": "ab2a242d-f1a5-62ed-b420-31b52e958586",
"AllowLdapLifeCycleSynchronization": true,
"DirectoryServers": [
{
"$type": "Entities.DirectoryServer",
"AddressId": "e6a8edbb-ad56-4135-94af-fab50b774256",
"Port": 389,
"Host": "192.168.342.234"
}
],
"DomainNames": [
"TESTNetBIOS"
],
"BaseDn": null,
"UseSsl": false,
"RepositoryType": 1,
"DirectoryCustomizations": null,
"_etag": "\"140046f2-0000-0000-0000-5ac63a180000\"",
"LastUpdateTime": "2018-04-05T15:00:40.243Z",
"id": "ab2a242d-f1a5-62ed-b420-31b52e958586",
"PartitionKey": "directory-clusters-9b679d2a-42c5-4c9a-a2e2-3ce63c1c3506",
"_rid": "kpvxLAs6gkmsCQAAAAAAAA==",
"_self": "dbs/kvpxAA==/colls/kpvxLAs6gkk=/docs/kvpxALs6kgmsCQAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1522940440
}
并且您的查找和复制 activity 源数据集引用相同的 cosmosdb 数据集。
如果你想复制你的 5 collections,你可以将此管道放入执行 activity。 execute activity 的主管道有一个 foreach activity.
由于您的 cosmosdb 有数组,而 ADF 不支持 cosmos db 的序列化数组,这是我可以提供的解决方法。
首先,将所有文档导出到 json 文件,按原样导出 json(到 blob 或 adls 或文件系统,任何文件存储)。我想你已经知道该怎么做了。这样,每个合集都会有一个json个文件。
其次,处理每个json文件,将文件中的每一行精确到一个文件中。
我只为第 2 步提供管道。您可以使用执行管道 activity 链接第 1 步和第 2 步。您甚至可以使用 foreach activity 处理第 2 步中的所有集合.
管道json
{
"name": "pipeline27",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"dataset": {
"referenceName": "AzureBlob7",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('Lookup1').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": {
"value": "select @{item()}",
"type": "Expression"
},
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "DocumentDbCollection1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob6",
"type": "DatasetReference",
"parameters": {
"id": {
"value": "@item().id",
"type": "Expression"
},
"PartitionKey": {
"value": "@item().PartitionKey",
"type": "Expression"
}
}
}
]
}
]
}
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
用于查找的数据集json
{
"name": "AzureBlob7",
"properties": {
"linkedServiceName": {
"referenceName": "bloblinkedservice",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": "cosmos.json",
"folderPath": "aaa"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
要复制的源数据集。实际上,这个数据集没有用。只想用它来托管查询 (select @{item()}
{
"name": "DocumentDbCollection1",
"properties": {
"linkedServiceName": {
"referenceName": "CosmosDB-r8c",
"type": "LinkedServiceReference"
},
"type": "DocumentDbCollection",
"typeProperties": {
"collectionName": "test"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
目标数据集。通过两个参数,它还解决了您的文件名请求。
{
"name": "AzureBlob6",
"properties": {
"linkedServiceName": {
"referenceName": "AzureStorage-eastus",
"type": "LinkedServiceReference"
},
"parameters": {
"id": {
"type": "String"
},
"PartitionKey": {
"type": "String"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "setOfObjects"
},
"fileName": {
"value": "@{dataset().PartitionKey}-@{dataset().id}.json",
"type": "Expression"
},
"folderPath": "aaacosmos"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
另请注意 Lookup 的限制 activity:
支持查找以下数据源。 Lookup activity 可以返回的最大行数为 5000,最大为 2MB。目前 Lookup activity 超时前的最长持续时间为一小时。
您是否考虑过使用 Azure Functions 以不同的方式实现它? ADF 旨在将数据从一个地方批量移动到另一个地方,并且每个集合只生成一个文件。
您可以考虑拥有一个 Azure Functions,在您的集合中添加/更新文档时触发该功能,并让 Azure Functions 将文档输出到 blob 存储。这应该可以很好地扩展并且相对容易实施。
我也为此苦苦挣扎,尤其是绕过查找的大小限制 activity,因为我们有很多数据要迁移。我最终创建了一个 JSON 文件,其中包含用于查询 Cosmos 数据的时间戳列表,然后为每个文件获取该范围内的文档 ID,然后为每个文件获取完整的文档数据和将其保存到 PartitionKey/DocumentID
等路径。这是我创建的管道:
LookupTimestamps - 遍历 times.json
文件中的每个时间戳范围,并针对每个时间戳执行 ExportFromCosmos 管道
{
"name": "LookupTimestamps",
"properties": {
"activities": [
{
"name": "LookupTimestamps",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": false
},
"dataset": {
"referenceName": "BlobStorageTimestamps",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachTimestamp",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupTimestamps",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupTimestamps').output.value",
"type": "Expression"
},
"isSequential": false,
"activities": [
{
"name": "Execute Pipeline1",
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "ExportFromCosmos",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"From": {
"value": "@{item().From}",
"type": "Expression"
},
"To": {
"value": "@{item().To}",
"type": "Expression"
}
}
}
}
]
}
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
ExportFromCosmos - 从上述管道执行的嵌套管道。这是为了解决不能嵌套 ForEach 活动的事实。
{
"name": "ExportFromCosmos",
"properties": {
"activities": [
{
"name": "LookupDocuments",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": {
"value": "select c.id, c.partitionKey from c where c._ts >= @{pipeline().parameters.from} and c._ts <= @{pipeline().parameters.to} order by c._ts desc",
"type": "Expression"
},
"nestingSeparator": "."
},
"dataset": {
"referenceName": "CosmosDb",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachDocument",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupDocuments",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupDocuments').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": {
"value": "select * from c where c.id = \"@{item().id}\" and c.partitionKey = \"@{item().partitionKey}\"",
"type": "Expression"
},
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "CosmosDb",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "BlobStorageDocuments",
"type": "DatasetReference",
"parameters": {
"id": {
"value": "@item().id",
"type": "Expression"
},
"partitionKey": {
"value": "@item().partitionKey",
"type": "Expression"
}
}
}
]
}
]
}
}
],
"parameters": {
"from": {
"type": "int"
},
"to": {
"type": "int"
}
}
}
}
BlobStorageTimestamps - times.json
文件的数据集
{
"name": "BlobStorageTimestamps",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": "times.json",
"folderPath": "mycollection"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
BlobStorageDocuments - 用于保存文档的数据集
{
"name": "BlobStorageDocuments",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"parameters": {
"id": {
"type": "string"
},
"partitionKey": {
"type": "string"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": {
"value": "@{dataset().partitionKey}/@{dataset().id}.json",
"type": "Expression"
},
"folderPath": "mycollection"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
times.json 文件只是纪元时间列表,如下所示:
[{
"From": 1556150400,
"To": 1556236799
},
{
"From": 1556236800,
"To": 1556323199
}]
我正在尝试使用 Azure 数据工厂 (v2) 备份我的 Cosmos Db 存储。一般来说,它正在做它的工作,但我想让 Cosmos 集合中的每个文档都对应 blob 存储中的新 json 文件。
使用下一个复制参数,我可以将集合中的所有文档复制到 Azure Blob 存储中的 1 个文件中:
{
"name": "ForEach_mih",
"type": "ForEach",
"typeProperties": {
"items": {
"value": "@pipeline().parameters.cw_items",
"type": "Expression"
},
"activities": [
{
"name": "Copy_mih",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"userProperties": [
{
"name": "Source",
"value": "@{item().source.collectionName}"
},
{
"name": "Destination",
"value": "cosmos-backup-v2/@{item().destination.fileName}"
}
],
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"enableSkipIncompatibleRow": true,
"redirectIncompatibleRowSettings": {
"linkedServiceName": {
"referenceName": "Clear_Test_BlobStorage",
"type": "LinkedServiceReference"
},
"path": "cosmos-backup-logs"
},
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "SourceDataset_mih",
"type": "DatasetReference",
"parameters": {
"cw_collectionName": "@item().source.collectionName"
}
}
],
"outputs": [
{
"referenceName": "DestinationDataset_mih",
"type": "DatasetReference",
"parameters": {
"cw_fileName": "@item().destination.fileName"
}
}
]
}
]
}
}
如何将每个 cosmos 文档复制到单独的文件并将其命名为 {PartitionId}-{docId}?
UPD
源集代码:
{
"name": "ClustersData",
"properties": {
"linkedServiceName": {
"referenceName": "Clear_Test_CosmosDb",
"type": "LinkedServiceReference"
},
"type": "DocumentDbCollection",
"typeProperties": {
"collectionName": "directory-clusters"
}
}
}
目标集代码:
{
"name": "OutputClusters",
"properties": {
"linkedServiceName": {
"referenceName": "Clear_Test_BlobStorage",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": "",
"folderPath": "cosmos-backup-logs"
}
}
}
流水线代码:
{
"name": "copy-clsts",
"properties": {
"activities": [
{
"name": "LookupClst",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"nestingSeparator": "."
},
"dataset": {
"referenceName": "ClustersData",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachClst",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupClst",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupClst').output.value",
"type": "Expression"
},
"batchCount": 8,
"activities": [
{
"name": "CpyClst",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": "select @{item()}",
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"enableSkipIncompatibleRow": true,
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "ClustersData",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "OutputClusters",
"type": "DatasetReference"
}
]
}
]
}
}
]
}
}
输入集合中的文档示例(所有格式相同):
{
"$type": "Entities.ADCluster",
"DisplayName": "TESTNetBIOS",
"OrgId": "9b679d2a-42c5-4c9a-a2e2-3ce63c1c3506",
"ClusterId": "ab2a242d-f1a5-62ed-b420-31b52e958586",
"AllowLdapLifeCycleSynchronization": true,
"DirectoryServers": [
{
"$type": "Entities.DirectoryServer",
"AddressId": "e6a8edbb-ad56-4135-94af-fab50b774256",
"Port": 389,
"Host": "192.168.342.234"
}
],
"DomainNames": [
"TESTNetBIOS"
],
"BaseDn": null,
"UseSsl": false,
"RepositoryType": 1,
"DirectoryCustomizations": null,
"_etag": "\"140046f2-0000-0000-0000-5ac63a180000\"",
"LastUpdateTime": "2018-04-05T15:00:40.243Z",
"id": "ab2a242d-f1a5-62ed-b420-31b52e958586",
"PartitionKey": "directory-clusters-9b679d2a-42c5-4c9a-a2e2-3ce63c1c3506",
"_rid": "kpvxLAs6gkmsCQAAAAAAAA==",
"_self": "dbs/kvpxAA==/colls/kpvxLAs6gkk=/docs/kvpxALs6kgmsCQAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1522940440
}
并且您的查找和复制 activity 源数据集引用相同的 cosmosdb 数据集。
如果你想复制你的 5 collections,你可以将此管道放入执行 activity。 execute activity 的主管道有一个 foreach activity.
由于您的 cosmosdb 有数组,而 ADF 不支持 cosmos db 的序列化数组,这是我可以提供的解决方法。
首先,将所有文档导出到 json 文件,按原样导出 json(到 blob 或 adls 或文件系统,任何文件存储)。我想你已经知道该怎么做了。这样,每个合集都会有一个json个文件。
其次,处理每个json文件,将文件中的每一行精确到一个文件中。
我只为第 2 步提供管道。您可以使用执行管道 activity 链接第 1 步和第 2 步。您甚至可以使用 foreach activity 处理第 2 步中的所有集合.
管道json
{
"name": "pipeline27",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"dataset": {
"referenceName": "AzureBlob7",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('Lookup1').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": {
"value": "select @{item()}",
"type": "Expression"
},
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "DocumentDbCollection1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob6",
"type": "DatasetReference",
"parameters": {
"id": {
"value": "@item().id",
"type": "Expression"
},
"PartitionKey": {
"value": "@item().PartitionKey",
"type": "Expression"
}
}
}
]
}
]
}
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
用于查找的数据集json
{
"name": "AzureBlob7",
"properties": {
"linkedServiceName": {
"referenceName": "bloblinkedservice",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": "cosmos.json",
"folderPath": "aaa"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
要复制的源数据集。实际上,这个数据集没有用。只想用它来托管查询 (select @{item()}
{
"name": "DocumentDbCollection1",
"properties": {
"linkedServiceName": {
"referenceName": "CosmosDB-r8c",
"type": "LinkedServiceReference"
},
"type": "DocumentDbCollection",
"typeProperties": {
"collectionName": "test"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
目标数据集。通过两个参数,它还解决了您的文件名请求。
{
"name": "AzureBlob6",
"properties": {
"linkedServiceName": {
"referenceName": "AzureStorage-eastus",
"type": "LinkedServiceReference"
},
"parameters": {
"id": {
"type": "String"
},
"PartitionKey": {
"type": "String"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "setOfObjects"
},
"fileName": {
"value": "@{dataset().PartitionKey}-@{dataset().id}.json",
"type": "Expression"
},
"folderPath": "aaacosmos"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
另请注意 Lookup 的限制 activity: 支持查找以下数据源。 Lookup activity 可以返回的最大行数为 5000,最大为 2MB。目前 Lookup activity 超时前的最长持续时间为一小时。
您是否考虑过使用 Azure Functions 以不同的方式实现它? ADF 旨在将数据从一个地方批量移动到另一个地方,并且每个集合只生成一个文件。
您可以考虑拥有一个 Azure Functions,在您的集合中添加/更新文档时触发该功能,并让 Azure Functions 将文档输出到 blob 存储。这应该可以很好地扩展并且相对容易实施。
我也为此苦苦挣扎,尤其是绕过查找的大小限制 activity,因为我们有很多数据要迁移。我最终创建了一个 JSON 文件,其中包含用于查询 Cosmos 数据的时间戳列表,然后为每个文件获取该范围内的文档 ID,然后为每个文件获取完整的文档数据和将其保存到 PartitionKey/DocumentID
等路径。这是我创建的管道:
LookupTimestamps - 遍历 times.json
文件中的每个时间戳范围,并针对每个时间戳执行 ExportFromCosmos 管道
{
"name": "LookupTimestamps",
"properties": {
"activities": [
{
"name": "LookupTimestamps",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": false
},
"dataset": {
"referenceName": "BlobStorageTimestamps",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachTimestamp",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupTimestamps",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupTimestamps').output.value",
"type": "Expression"
},
"isSequential": false,
"activities": [
{
"name": "Execute Pipeline1",
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "ExportFromCosmos",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"From": {
"value": "@{item().From}",
"type": "Expression"
},
"To": {
"value": "@{item().To}",
"type": "Expression"
}
}
}
}
]
}
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
ExportFromCosmos - 从上述管道执行的嵌套管道。这是为了解决不能嵌套 ForEach 活动的事实。
{
"name": "ExportFromCosmos",
"properties": {
"activities": [
{
"name": "LookupDocuments",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": {
"value": "select c.id, c.partitionKey from c where c._ts >= @{pipeline().parameters.from} and c._ts <= @{pipeline().parameters.to} order by c._ts desc",
"type": "Expression"
},
"nestingSeparator": "."
},
"dataset": {
"referenceName": "CosmosDb",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachDocument",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupDocuments",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupDocuments').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": {
"value": "select * from c where c.id = \"@{item().id}\" and c.partitionKey = \"@{item().partitionKey}\"",
"type": "Expression"
},
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "CosmosDb",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "BlobStorageDocuments",
"type": "DatasetReference",
"parameters": {
"id": {
"value": "@item().id",
"type": "Expression"
},
"partitionKey": {
"value": "@item().partitionKey",
"type": "Expression"
}
}
}
]
}
]
}
}
],
"parameters": {
"from": {
"type": "int"
},
"to": {
"type": "int"
}
}
}
}
BlobStorageTimestamps - times.json
文件的数据集
{
"name": "BlobStorageTimestamps",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": "times.json",
"folderPath": "mycollection"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
BlobStorageDocuments - 用于保存文档的数据集
{
"name": "BlobStorageDocuments",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"parameters": {
"id": {
"type": "string"
},
"partitionKey": {
"type": "string"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": {
"value": "@{dataset().partitionKey}/@{dataset().id}.json",
"type": "Expression"
},
"folderPath": "mycollection"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
times.json 文件只是纪元时间列表,如下所示:
[{
"From": 1556150400,
"To": 1556236799
},
{
"From": 1556236800,
"To": 1556323199
}]