如何在 Azure 数据工厂的 DataFlow 中获取管道的元数据?想要创建调试管道

How to get meta data of the pipeline in DataFlow of Azure data factory? Want to create a debug pipeline

我有一个任务需要创建一个仪表板,该仪表板将提供 azure datafactory 中 运行 管道的详细信息。为此,我想将数据加载到 SQL。目的是测试任何数据流的性能。管道将是一个即插即用模型,我们只需拖放要测试的数据流,建立连接并在每个管道 运行,从管道中获取以下数据。以下是我要填写的列名:

**Sr No:**               Auto Increment, Primary key
**pipeline ID:**         <ID of pipeline run>
**Dataflow Start Time:** ddmmyy HH:SS
**Dataflow end time:**   ddmmyy HH:SS
**Duration to acquire compute:** In seconds
**Data input:**           Size of data read from source
**Data output:**          Size of data written to sink
**Run type:**             Triggered / Manual(Debug) run
**DataFlow Status:** Failed / Success

我可以查看元数据 blade 以获取源的元数据。但是我似乎无法找到资源来获取其他信息。如果您知道如何在 Azure 数据工厂中创建这样的管道,请提供帮助。

您可以使用 WebActivity 调用 API 并获取详细信息,稍后将其复制到 SQL 接收器。

这是一个例子:

  1. Pipeline Runs - Query By Factory

https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/queryPipelineRuns?api-version=2018-06-01

替换占位符中的实际值。

Output
{
    "value": [
        {
            "id": "/SUBSCRIPTIONS/B83C1EE3-C5B6-44FB-B5BA-2A83A074C23F/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
            "runId": "c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
            "debugRunId": null,
            "runGroupId": "c619e850-xxxx-xxxx-8d0c-4b73dcacb9d8",
            "pipelineName": "partiion_parquets",
            "parameters": {},
            "invokedBy": {
                "id": "216e5917354a41b983f82d8ceda53789",
                "name": "Manual",
                "invokedByType": "Manual"
            },
            "runStart": "2021-10-12T13:00:12.9095561Z",
            "runEnd": "2021-10-12T13:03:17.9824334Z",
            "durationInMs": 185072,
            "status": "Succeeded",
            "message": "",
            "output": null,
            "lastUpdated": "2021-10-12T13:03:17.9824334Z",
            "annotations": [],
            "runDimension": {},
            "isLatest": true
        }
    ],
    "ADFWebActivityResponseHeaders": {
        "Pragma": "no-cache",
        "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
        "X-Content-Type-Options": "nosniff",
        "x-ms-ratelimit-remaining-subscription-reads": "11999",
        "x-ms-request-id": "4373f002-b352-4824-857b-15f5325241ae",
        "x-ms-correlation-request-id": "4373f002-b352-4824-857b-15f5325241ae",
        "x-ms-routing-request-id": "CENTRALUS:20211012T131047Z:4373f002-b352-4824-857b-15f5325241ae",
        "Cache-Control": "no-cache",
        "Date": "Tue, 12 Oct 2021 13:10:46 GMT",
        "Server": "Microsoft-IIS/10.0",
        "X-Powered-By": "ASP.NET",
        "Content-Length": "717",
        "Content-Type": "application/json; charset=utf-8",
        "Expires": "-1"
    },
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
    "executionDuration": 0,
    "durationInQueue": {
        "integrationRuntimeQueue": 0
    },
    "billingReference": {
        "activityType": "ExternalActivity",
        "billableDuration": [
            {
                "meterType": "AzureIR",
                "duration": 0.016666666666666666,
                "unit": "Hours"
            }
        ]
    }
}

接下来例如,使用先前输出的管道 运行 ID,调用 api 以获取数据流详细信息。

  1. Activity Runs - Query By Pipeline Run

https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelineruns/{runId}/queryActivityruns?api-version=2018-06-01

Output
{
    "value": [
        {
            "activityRunEnd": "2021-10-12T13:03:17.1874105Z",
            "activityName": "Data flow1",
            "activityRunStart": "2021-10-12T13:00:14.7523815Z",
            "activityType": "ExecuteDataFlow",
            "durationInMs": 182435,
            "retryAttempt": null,
            "error": {
                "errorCode": "",
                "message": "",
                "failureType": "",
                "target": "Data flow1",
                "details": ""
            },
            "activityRunId": "5e7aece7-5fb7-4ffd-939e-d01d3fcb494a",
            "iterationHash": "",
            "input": {
                "dataflow": {
                    "referenceName": "generateParquet",
                    "type": "DataFlowReference",
                    "parameters": {},
                    "datasetParameters": {
                        "source1": {},
                        "sink1": {}
                    }
                },
                "staging": {},
                "compute": {
                    "coreCount": 8,
                    "computeType": "General"
                },
                "traceLevel": "Fine",
                "dataFlowETag": "48019f9b-0000-0300-0000-616475e20000"
            },
            "linkedServiceName": "",
            "output": {
                "runStatus": {
                    "computeAcquisitionDuration": 107335,
                    "version": "20211011.2",
                    "profile": {
                        "source1": {
                            "computed": [],
                            "lineage": {},
                            "dropped": 0,
                            "drifted": 0,
                            "newer": 16,
                            "total": 16,
                            "updated": 0
                        },
                        "sink1": {
                            "computed": [],
                            "lineage": {
                                "CompanyName": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "CompanyName"
                                            ]
                                        }
                                    ]
                                },
                                "LastName": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "LastName"
                                            ]
                                        }
                                    ]
                                },
                                "MiddleName": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "MiddleName"
                                            ]
                                        }
                                    ]
                                },
                                "PasswordSalt": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "PasswordSalt"
                                            ]
                                        }
                                    ]
                                },
                                "ModifiedDate": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "ModifiedDate"
                                            ]
                                        }
                                    ]
                                },
                                "path": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "path"
                                            ]
                                        }
                                    ]
                                },
                                "rowguid": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "rowguid"
                                            ]
                                        }
                                    ]
                                },
                                "Title": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "Title"
                                            ]
                                        }
                                    ]
                                },
                                "NameStyle": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "NameStyle"
                                            ]
                                        }
                                    ]
                                },
                                "CustomerID": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "CustomerID"
                                            ]
                                        }
                                    ]
                                },
                                "FirstName": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "FirstName"
                                            ]
                                        }
                                    ]
                                },
                                "Suffix": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "Suffix"
                                            ]
                                        }
                                    ]
                                },
                                "Phone": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "Phone"
                                            ]
                                        }
                                    ]
                                },
                                "PasswordHash": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "PasswordHash"
                                            ]
                                        }
                                    ]
                                },
                                "SalesPerson": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "SalesPerson"
                                            ]
                                        }
                                    ]
                                },
                                "EmailAddress": {
                                    "mapped": true,
                                    "from": [
                                        {
                                            "source": "source1",
                                            "columns": [
                                                "EmailAddress"
                                            ]
                                        }
                                    ]
                                }
                            },
                            "dropped": 0,
                            "drifted": 0,
                            "newer": 0,
                            "total": 16,
                            "updated": 16
                        }
                    },
                    "metrics": {
                        "sink1": {
                            "format": "parquet",
                            "stages": [
                                {
                                    "stage": 1,
                                    "partitionTimes": [
                                        525
                                    ],
                                    "lastUpdateTime": "2021-10-12 13:02:50.453",
                                    "bytesWritten": 0,
                                    "bytesRead": 199491,
                                    "partitionStatus": "Success",
                                    "streams": {
                                        "source1": {
                                            "count": 847,
                                            "cached": false,
                                            "totalPartitions": 1,
                                            "partitionStatus": "Success",
                                            "partitionCounts": [
                                                847
                                            ],
                                            "type": "source"
                                        }
                                    },
                                    "target": "sink1",
                                    "time": 640,
                                    "progressState": "Completed"
                                },
                                {
                                    "stage": 2,
                                    "partitionTimes": [
                                        3436,
                                        3373,
                                        3441,
                                        3475,
                                        2304,
                                        2321,
                                        2364,
                                        2291,
                                        1989,
                                        2435
                                    ],
                                    "lastUpdateTime": "2021-10-12 13:02:58.897",
                                    "bytesWritten": 177728,
                                    "bytesRead": 0,
                                    "partitionStatus": "Success",
                                    "streams": {
                                        "sink1": {
                                            "count": 847,
                                            "cached": false,
                                            "totalPartitions": 10,
                                            "partitionStatus": "Success",
                                            "partitionCounts": [
                                                84,
                                                85,
                                                85,
                                                85,
                                                85,
                                                85,
                                                85,
                                                85,
                                                84,
                                                84
                                            ],
                                            "type": "sink"
                                        }
                                    },
                                    "target": "sink1",
                                    "time": 8452,
                                    "progressState": "Completed"
                                }
                            ],
                            "sinkPostProcessingTime": 0,
                            "data": [],
                            "store": "blob",
                            "rowsWritten": 847,
                            "details": {
                                "postCommandsDuration": [
                                    3
                                ],
                                "preCommandsDuration": [
                                    1
                                ]
                            },
                            "sources": {
                                "source1": {
                                    "rowsRead": 847,
                                    "store": "blob",
                                    "details": {
                                        "fileSystemInitDuration": [
                                            7751
                                        ]
                                    },
                                    "format": "delimited"
                                }
                            },
                            "sinkProcessingTime": 12380
                        }
                    },
                    "dsl": "\nsource() ~> source1\n\nsource1 sink() ~> sink1"
                },
                "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
                "billingReference": {
                    "activityType": "executedataflow",
                    "billableDuration": [
                        {
                            "meterType": "General",
                            "duration": 0.3691165737777778,
                            "unit": "coreHour",
                            "sessionType": "JobCluster"
                        }
                    ]
                },
                "reportLineageToPurview": {
                    "status": "NotReported"
                }
            },
            "userProperties": {},
            "pipelineName": "partiion_parquets",
            "pipelineRunId": "c619e850-c7de-4d1f-8d0c-4b73dcacb9d8",
            "status": "Succeeded",
            "recoveryStatus": "None",
            "integrationRuntimeNames": [
                "defaultintegrationruntime"
            ],
            "executionDetails": {
                "integrationRuntime": [
                    {
                        "name": "AutoResolveIntegrationRuntime",
                        "type": "Managed",
                        "computeType": "General",
                        "coreCount": "8"
                    }
                ]
            },
            "id": "/SUBSCRIPTIONS/B83C1EE3-xxxx-xxxx-B5BA-2E83A074C23K/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8/activityruns/5e7aece7-xxxx-4ffd-939e-d01d3fcb494a"
        }
    ],
    "ADFWebActivityResponseHeaders": {
        "Pragma": "no-cache",
        "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
        "X-Content-Type-Options": "nosniff",
        "x-ms-ratelimit-remaining-subscription-reads": "11999",
        "x-ms-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
        "x-ms-correlation-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
        "x-ms-routing-request-id": "CENTRALUS:20211012T131050Z:e8fa08f2-594a-4464-b43e-d71104c1950c",
        "Cache-Control": "no-cache",
        "Date": "Tue, 12 Oct 2021 13:10:49 GMT",
        "Server": "Microsoft-IIS/10.0",
        "X-Powered-By": "ASP.NET",
        "Content-Length": "4307",
        "Content-Type": "application/json; charset=utf-8",
        "Expires": "-1"
    },
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
    "executionDuration": 0,
    "durationInQueue": {
        "integrationRuntimeQueue": 0
    },
    "billingReference": {
        "activityType": "ExternalActivity",
        "billableDuration": [
            {
                "meterType": "AzureIR",
                "duration": 0.016666666666666666,
                "unit": "Hours"
            }
        ]
    }
}

从上面你可以筛选 "activityType": "ExecuteDataFlow" , durationInMs, activityRunStart, activityRunEnd, invokedByType, input, output 和其他指标。

只需按照 Azure Data Factory version 2 (V2) REST API operations,参数化所选输出并在必要时存储在变量中。使用 Add dynamic content 构建 URL 和 JSON 适当地请求正文。