Azure 数据工厂 activity 复制从 foreach activity 中获取多个 table 名称
Azure Data Factory activity copy get multiple table name from foreach activity
使用 Data Factory V2 我正在尝试实现从一个 Azure SQL 数据库到另一个数据库的数据流复制。
我必须将多个 table 合并到一个目标 table.
为此,我创建了一个查找 activity,用于创建要复制的三个 table 的名称。输出 JSON
文件被传递给 foreach activity,应该将每个 table 的数据复制到目标 table,但管道执行不成功。
我在下面报告我的管道代码:
{
"name": "FLD_Item_base",
"properties": {
"activities": [
{
"name": "myLookup",
"type": "Lookup",
"dependsOn": [
{
"activity": "myStoredProcedure",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "SELECT tsy_company_desc, tsy_tabella_desc, tsy_company_id, \ncase when tsy_company_desc = 'all' \nthen '['+ tsy_tabella_desc + ']' else '['+tsy_company_desc + '$' + tsy_tabella_desc + ']' end as nome_tabella_completo \nfrom dbo.TSY_FLUSSI_LOAD_DWH \nwhere tsy_flusso_desc = 'item_base' \nand tsy_tabella_desc='Item' \n"
},
"dataset": {
"referenceName": "TLD_Item",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "myStoredProcedure",
"type": "SqlServerStoredProcedure",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"storedProcedureName": "[dbo].[myStoredProcedure]"
},
"linkedServiceName": {
"referenceName": "Sink",
"type": "LinkedServiceReference"
}
},
{
"name": "IterateSQLTables",
"type": "ForEach",
"dependsOn": [
{
"activity": "myLookup",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('myLookup').output",
"type": "Expression"
},
"activities": [
{
"name": "FullCopyActivity",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "SELECT * FROM @{item().value.name_tab}",
"type": "Expression"
}
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "Source",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "TLD_Item",
"type": "DatasetReference"
}
]
}
]
}
}
管道执行returns出现如下错误:
{
"errorCode": "400",
"message": "Activity failed because an inner activity failed",
"failureType": "UserError",
"target": "IterateSQLTables"
}
有谁知道如何解决这个问题?谢谢
请在 activity IterateSQLTables
的 items
中尝试 @activity('myLookup').output.value
而不是 @activity('myLookup').output
。
您可以找到文档进行查找activityhere
使用 Data Factory V2 我正在尝试实现从一个 Azure SQL 数据库到另一个数据库的数据流复制。 我必须将多个 table 合并到一个目标 table.
为此,我创建了一个查找 activity,用于创建要复制的三个 table 的名称。输出 JSON
文件被传递给 foreach activity,应该将每个 table 的数据复制到目标 table,但管道执行不成功。
我在下面报告我的管道代码:
{
"name": "FLD_Item_base",
"properties": {
"activities": [
{
"name": "myLookup",
"type": "Lookup",
"dependsOn": [
{
"activity": "myStoredProcedure",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "SELECT tsy_company_desc, tsy_tabella_desc, tsy_company_id, \ncase when tsy_company_desc = 'all' \nthen '['+ tsy_tabella_desc + ']' else '['+tsy_company_desc + '$' + tsy_tabella_desc + ']' end as nome_tabella_completo \nfrom dbo.TSY_FLUSSI_LOAD_DWH \nwhere tsy_flusso_desc = 'item_base' \nand tsy_tabella_desc='Item' \n"
},
"dataset": {
"referenceName": "TLD_Item",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "myStoredProcedure",
"type": "SqlServerStoredProcedure",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"storedProcedureName": "[dbo].[myStoredProcedure]"
},
"linkedServiceName": {
"referenceName": "Sink",
"type": "LinkedServiceReference"
}
},
{
"name": "IterateSQLTables",
"type": "ForEach",
"dependsOn": [
{
"activity": "myLookup",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('myLookup').output",
"type": "Expression"
},
"activities": [
{
"name": "FullCopyActivity",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "SELECT * FROM @{item().value.name_tab}",
"type": "Expression"
}
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "Source",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "TLD_Item",
"type": "DatasetReference"
}
]
}
]
}
}
管道执行returns出现如下错误:
{
"errorCode": "400",
"message": "Activity failed because an inner activity failed",
"failureType": "UserError",
"target": "IterateSQLTables"
}
有谁知道如何解决这个问题?谢谢
请在 activity IterateSQLTables
的 items
中尝试 @activity('myLookup').output.value
而不是 @activity('myLookup').output
。
您可以找到文档进行查找activityhere