遍历文件夹目录中的每个文件并检查日期 Azure Data Factory V2 - 错误代码

Loop over each file in folder directory and check date Azure Data Factory V2 -wrong code

我想遍历 stfp 文件夹中的每个文件并检查它是否是新文件,然后将新文件复制到数据湖上 现在我有下面的代码,但我认为它不正确。在第二个 GetLastModifyfromFile activity 中没有使用 @item() 来引用循环中的项目最后日期,而是引用一个完全不同的数据集,称为 SrcLocalFile。

{
"name": "IncrementalloadfromSingleFolder",
"properties": {
    "activities": [
        {
            "name": "GetFileList",
            "type": "GetMetadata",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false
            },
            "typeProperties": {
                "dataset": {
                    "referenceName": "SrcLocalDir",
                    "type": "DatasetReference"
                },
                "fieldList": [
                    "childItems"
                ]
            }
        },
        {
            "name": "ForEachFile",
            "type": "ForEach",
            "dependsOn": [
                {
                    "activity": "GetFileList",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "typeProperties": {
                "items": {
                    "value": "@activity('GetFileList').output.childItems",
                    "type": "Expression"
                },
                "activities": [
                    {
                        "name": "GetLastModifyfromFile",
                        "type": "GetMetadata",
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false
                        },
                        "typeProperties": {
                            "dataset": {
                                "referenceName": "SrcLocalFile",
                                "type": "DatasetReference"
                            },
                            "fieldList": [
                                "lastModified"
                            ]
                        }
                    },
                    {
                        "name": "IfNewFile",
                        "type": "IfCondition",
                        "dependsOn": [
                            {
                                "activity": "GetLastModifyfromFile",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ],
                        "typeProperties": {
                            "expression": {
                                "value": "@and(less(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.current_time), greaterOrEquals(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.last_time))",
                                "type": "Expression"
                            },
                            "ifTrueActivities": [
                                {
                                    "name": "CopyNewFiles",
                                    "type": "Copy",
                                    "policy": {
                                        "timeout": "7.00:00:00",
                                        "retry": 0,
                                        "retryIntervalInSeconds": 30,
                                        "secureOutput": false
                                    },
                                    "typeProperties": {
                                        "source": {
                                            "type": "FileSystemSource",
                                            "recursive": false
                                        },
                                        "sink": {
                                            "type": "BlobSink"
                                        },
                                        "enableStaging": false,
                                        "dataIntegrationUnits": 0
                                    },
                                    "inputs": [
                                        {
                                            "referenceName": "SrcLocalFile",
                                            "type": "DatasetReference"
                                        }
                                    ],
                                    "outputs": [
                                        {
                                            "referenceName": "TgtBooksBlob",
                                            "type": "DatasetReference"
                                        }
                                    ]
                                }
                            ]
                        }
                    }
                ]
            }
        }
    ],
    "parameters": {
        "current_time": {
            "type": "String",
            "defaultValue": "2018-04-01T00:00:00Z"
        },
        "last_time": {
            "type": "String",
            "defaultValue": "2018-03-01T00:00:00Z"
        }
    },
    "folder": {
        "name": "IncrementalLoadSingleFolder"
    }
},
"type": "Microsoft.DataFactory/factories/pipelines"

}

只是一个想法 - 我没有看到您的数据集定义,但是...

是否应该将路径和文件名作为参数传递给数据集?

即将 2 个参数添加到路径和文件的数据集定义(比如路径参数和文件参数)。在数据集的文件名和文件夹名设置中使用这些参数作为@dataset().pathparam 和@dataset().fileparam。

在上面的代码中,传入参数数据集输入的新 "parameters" 部分,其中 pathparam 和 fileparam 等于您从之前 activity 检索到的文件夹和子项。

注意 - 存在数据集名称中不能包含空格的错误。