遍历文件夹目录中的每个文件并检查日期 Azure Data Factory V2 - 错误代码
Loop over each file in folder directory and check date Azure Data Factory V2 -wrong code
我想遍历 stfp 文件夹中的每个文件并检查它是否是新文件,然后将新文件复制到数据湖上
现在我有下面的代码,但我认为它不正确。在第二个 GetLastModifyfromFile
activity 中没有使用 @item()
来引用循环中的项目最后日期,而是引用一个完全不同的数据集,称为 SrcLocalFile。
{
"name": "IncrementalloadfromSingleFolder",
"properties": {
"activities": [
{
"name": "GetFileList",
"type": "GetMetadata",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"dataset": {
"referenceName": "SrcLocalDir",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
]
}
},
{
"name": "ForEachFile",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetFileList",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('GetFileList').output.childItems",
"type": "Expression"
},
"activities": [
{
"name": "GetLastModifyfromFile",
"type": "GetMetadata",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"dataset": {
"referenceName": "SrcLocalFile",
"type": "DatasetReference"
},
"fieldList": [
"lastModified"
]
}
},
{
"name": "IfNewFile",
"type": "IfCondition",
"dependsOn": [
{
"activity": "GetLastModifyfromFile",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"expression": {
"value": "@and(less(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.current_time), greaterOrEquals(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.last_time))",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "CopyNewFiles",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "FileSystemSource",
"recursive": false
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "SrcLocalFile",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "TgtBooksBlob",
"type": "DatasetReference"
}
]
}
]
}
}
]
}
}
],
"parameters": {
"current_time": {
"type": "String",
"defaultValue": "2018-04-01T00:00:00Z"
},
"last_time": {
"type": "String",
"defaultValue": "2018-03-01T00:00:00Z"
}
},
"folder": {
"name": "IncrementalLoadSingleFolder"
}
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
只是一个想法 - 我没有看到您的数据集定义,但是...
是否应该将路径和文件名作为参数传递给数据集?
即将 2 个参数添加到路径和文件的数据集定义(比如路径参数和文件参数)。在数据集的文件名和文件夹名设置中使用这些参数作为@dataset().pathparam 和@dataset().fileparam。
在上面的代码中,传入参数数据集输入的新 "parameters" 部分,其中 pathparam 和 fileparam 等于您从之前 activity 检索到的文件夹和子项。
注意 - 存在数据集名称中不能包含空格的错误。
我想遍历 stfp 文件夹中的每个文件并检查它是否是新文件,然后将新文件复制到数据湖上
现在我有下面的代码,但我认为它不正确。在第二个 GetLastModifyfromFile
activity 中没有使用 @item()
来引用循环中的项目最后日期,而是引用一个完全不同的数据集,称为 SrcLocalFile。
{
"name": "IncrementalloadfromSingleFolder",
"properties": {
"activities": [
{
"name": "GetFileList",
"type": "GetMetadata",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"dataset": {
"referenceName": "SrcLocalDir",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
]
}
},
{
"name": "ForEachFile",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetFileList",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('GetFileList').output.childItems",
"type": "Expression"
},
"activities": [
{
"name": "GetLastModifyfromFile",
"type": "GetMetadata",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"dataset": {
"referenceName": "SrcLocalFile",
"type": "DatasetReference"
},
"fieldList": [
"lastModified"
]
}
},
{
"name": "IfNewFile",
"type": "IfCondition",
"dependsOn": [
{
"activity": "GetLastModifyfromFile",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"expression": {
"value": "@and(less(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.current_time), greaterOrEquals(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.last_time))",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "CopyNewFiles",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "FileSystemSource",
"recursive": false
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "SrcLocalFile",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "TgtBooksBlob",
"type": "DatasetReference"
}
]
}
]
}
}
]
}
}
],
"parameters": {
"current_time": {
"type": "String",
"defaultValue": "2018-04-01T00:00:00Z"
},
"last_time": {
"type": "String",
"defaultValue": "2018-03-01T00:00:00Z"
}
},
"folder": {
"name": "IncrementalLoadSingleFolder"
}
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
只是一个想法 - 我没有看到您的数据集定义,但是...
是否应该将路径和文件名作为参数传递给数据集?
即将 2 个参数添加到路径和文件的数据集定义(比如路径参数和文件参数)。在数据集的文件名和文件夹名设置中使用这些参数作为@dataset().pathparam 和@dataset().fileparam。
在上面的代码中,传入参数数据集输入的新 "parameters" 部分,其中 pathparam 和 fileparam 等于您从之前 activity 检索到的文件夹和子项。
注意 - 存在数据集名称中不能包含空格的错误。