Select 在 azure adf 管道中查询

Select query in azure adf pipeline

如何通过从存储在 azure blob 中的文件中获取数据来在 azure adf 管道中传递 select 查询。

例如:我在 blob 中的文件(测试)只有一个值

在我的 select 查询中的 adf 管道中,我们应该从 table.

中获取值

SQL 查询:"Select * from emp where id = (need to fetch the value from test)"

可能吗?

注意此答案指的是 Azure 数据工厂 (ADF) v1

v2 支持通过 Set Variable activity.

设置变量

回复 v1, 我认为这不可能直接在管道内实现,因为 Azure 数据工厂并不真正支持这种方式的变量。另一种方法是将文件从 blob 加载到您的另一个查询所在的同一数据库中 运行,创建一个将两者连接起来的存储过程,然后使用 Stored Proc Activity,例如:

SELECT * 
FROM emp 
WHERE id = ( SELECT id FROM yourBlobStagingTable );

希望这是有道理的。

原始答案针对的是 ADF 的 v1,当时无法实现此功能。

自 ADF v2(GA 2018 年 6 月)起,现在可以通过下面演示的方式传递参数。所以现在下面的管道定义使这成为可能。

你想要的是 Lookup 从 blob 存储中获取一个值。然后,您可以使用 @concat 字符串函数将其用作查询中的参数。如何从 blob 中获取正确的值取决于您的设计。您可以为此使用查找或元数据。

这是一个示例管道。

{
"name": "pipeline1",
"properties": {
    "activities": [
        {
            "name": "get_variable_from_file",
            "type": "Lookup",
            "dependsOn": [],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "source": {
                    "type": "DelimitedTextSource",
                    "storeSettings": {
                        "type": "AzureBlobFSReadSettings",
                        "recursive": true,
                        "enablePartitionDiscovery": false
                    },
                    "formatSettings": {
                        "type": "DelimitedTextReadSettings",
                        "skipLineCount": 1
                    }
                },
                "dataset": {
                    "referenceName": "blob_storage",
                    "type": "DatasetReference"
                },
                "firstRowOnly": true
            }
        },
        {
            "name": "use_variable_in_query",
            "type": "Copy",
            "dependsOn": [
                {
                    "activity": "get_variable_from_file",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "source": {
                    "type": "AzurePostgreSqlSource",
                    "query": {
                        "value": "@concat('select * from schema.emp where id=',activity('get_variable_from_file').output)",
                        "type": "Expression"
                    }
                },
                "enableStaging": false
            },
            "inputs": [
                {
                    "referenceName": "postgres_database",
                    "type": "DatasetReference",
                    "parameters": {
                        "table_name": "emp"
                    }
                }
            ]
        }
    ],
    "annotations": []
}

}