获取元数据、ForEach 并在 Azure 数据工厂中复制 activity
get metadata, ForEach and copy activity in Azure Data Factory
我想将 2 个表从 blob 存储复制到 SQL 数据库。
我创建了这样的管道:-
获取元数据:- 用于捕获输入容器中的文件(2 个 csv 文件)
ForEach:- 用于迭代输入容器中的文件
复制 activity:- 在 ForEach 中。复制 SQL 数据库中的两个文件。
现在,当我开始调试时,我收到错误 2200,它说 userBlobDoesNotExists。
这里是副本的错误代码activity:-
"copyDuration": 3,
"errors": [
{
"Code": 9013,
"Message": "ErrorCode=UserErrorSourceBlobNotExist,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=The required Blob is missing. ContainerName: https://employeestorage1.blob.core.windows.net/employeeinput, path: employeeinput/workdetail.csv.,Source=Microsoft.DataTransfer.ClientLibrary,'",
"EventType": 0,
"Category": 5,
"Data": {},
"MsgId": null,
"ExceptionType": null,
"Source": null,
"StackTrace": null,
"InnerEventInfos": []
}
],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
"usedDataIntegrationUnits": 4,
"billingReference": {
"activityType": "DataMovement",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
]
},
"usedParallelCopies": 1,
"executionDetails": [
{
"source": {
"type": "AzureBlobStorage",
"region": "East US"
},
"sink": {
"type": "AzureSqlDatabase",
"region": "East US"
},
"status": "Failed",
"start": "2021-06-24T17:28:09.4507134Z",
"duration": 3,
"usedDataIntegrationUnits": 4,
"usedParallelCopies": 1,
"profile": {
"queue": {
"status": "Completed",
"duration": 2
},
"transfer": {
"status": "Completed",
"duration": 0
}
},
"detailedDurations": {
"queuingDuration": 2,
"transferDuration": 0
}
}
],
"dataConsistencyVerification": {
"VerificationResult": "Unsupported"
},
"durationInQueue": {
"integrationRuntimeQueue": 0
}
}
这里是管道的代码:-
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "inputfolder",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "employeeinputdataset",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "for each table in input folder",
"type": "ForEach",
"dependsOn": [
{
"activity": "inputfolder",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('inputfolder').output.Childitems",
"type": "Expression"
},
"activities": [
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"wildcardFolderPath": "employeeinput",
"wildcardFileName": {
"value": "@item().name",
"type": "Expression"
},
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "AzureSqlSink",
"tableOption": "autoCreate",
"disableMetricsCollection": false
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "employeeinputdataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "employeeoutputsql",
"type": "DatasetReference",
"parameters": {
"OutputTableName": {
"value": "@item().name",
"type": "Expression"
}
}
}
]
}
]
}
}
],
"annotations": []
}
}
文件路径类型设置中不要选择通配符文件路径,请选择数据集中的文件路径试试。
并且您需要在源数据集中创建一个参数。在源数据集的文件路径中,键入 @dataset().fileName
表达式。最后给dataset的参数传@item().name
就ok了
截图:
我想将 2 个表从 blob 存储复制到 SQL 数据库。 我创建了这样的管道:- 获取元数据:- 用于捕获输入容器中的文件(2 个 csv 文件) ForEach:- 用于迭代输入容器中的文件 复制 activity:- 在 ForEach 中。复制 SQL 数据库中的两个文件。
现在,当我开始调试时,我收到错误 2200,它说 userBlobDoesNotExists。
这里是副本的错误代码activity:-
"copyDuration": 3,
"errors": [
{
"Code": 9013,
"Message": "ErrorCode=UserErrorSourceBlobNotExist,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=The required Blob is missing. ContainerName: https://employeestorage1.blob.core.windows.net/employeeinput, path: employeeinput/workdetail.csv.,Source=Microsoft.DataTransfer.ClientLibrary,'",
"EventType": 0,
"Category": 5,
"Data": {},
"MsgId": null,
"ExceptionType": null,
"Source": null,
"StackTrace": null,
"InnerEventInfos": []
}
],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
"usedDataIntegrationUnits": 4,
"billingReference": {
"activityType": "DataMovement",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
]
},
"usedParallelCopies": 1,
"executionDetails": [
{
"source": {
"type": "AzureBlobStorage",
"region": "East US"
},
"sink": {
"type": "AzureSqlDatabase",
"region": "East US"
},
"status": "Failed",
"start": "2021-06-24T17:28:09.4507134Z",
"duration": 3,
"usedDataIntegrationUnits": 4,
"usedParallelCopies": 1,
"profile": {
"queue": {
"status": "Completed",
"duration": 2
},
"transfer": {
"status": "Completed",
"duration": 0
}
},
"detailedDurations": {
"queuingDuration": 2,
"transferDuration": 0
}
}
],
"dataConsistencyVerification": {
"VerificationResult": "Unsupported"
},
"durationInQueue": {
"integrationRuntimeQueue": 0
}
}
这里是管道的代码:-
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "inputfolder",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "employeeinputdataset",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "for each table in input folder",
"type": "ForEach",
"dependsOn": [
{
"activity": "inputfolder",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('inputfolder').output.Childitems",
"type": "Expression"
},
"activities": [
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"wildcardFolderPath": "employeeinput",
"wildcardFileName": {
"value": "@item().name",
"type": "Expression"
},
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "AzureSqlSink",
"tableOption": "autoCreate",
"disableMetricsCollection": false
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "employeeinputdataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "employeeoutputsql",
"type": "DatasetReference",
"parameters": {
"OutputTableName": {
"value": "@item().name",
"type": "Expression"
}
}
}
]
}
]
}
}
],
"annotations": []
}
}
文件路径类型设置中不要选择通配符文件路径,请选择数据集中的文件路径试试。
并且您需要在源数据集中创建一个参数。在源数据集的文件路径中,键入 @dataset().fileName
表达式。最后给dataset的参数传@item().name
就ok了
截图: