如何在 Azure 数据工厂的 DataFlow 中获取管道的元数据?想要创建调试管道
How to get meta data of the pipeline in DataFlow of Azure data factory? Want to create a debug pipeline
我有一个任务需要创建一个仪表板,该仪表板将提供 azure datafactory 中 运行 管道的详细信息。为此,我想将数据加载到 SQL。目的是测试任何数据流的性能。管道将是一个即插即用模型,我们只需拖放要测试的数据流,建立连接并在每个管道 运行,从管道中获取以下数据。以下是我要填写的列名:
**Sr No:** Auto Increment, Primary key
**pipeline ID:** <ID of pipeline run>
**Dataflow Start Time:** ddmmyy HH:SS
**Dataflow end time:** ddmmyy HH:SS
**Duration to acquire compute:** In seconds
**Data input:** Size of data read from source
**Data output:** Size of data written to sink
**Run type:** Triggered / Manual(Debug) run
**DataFlow Status:** Failed / Success
我可以查看元数据 blade 以获取源的元数据。但是我似乎无法找到资源来获取其他信息。如果您知道如何在 Azure 数据工厂中创建这样的管道,请提供帮助。
您可以使用 WebActivity
调用 API 并获取详细信息,稍后将其复制到 SQL 接收器。
这是一个例子:
https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/queryPipelineRuns?api-version=2018-06-01
替换占位符中的实际值。
Output
{
"value": [
{
"id": "/SUBSCRIPTIONS/B83C1EE3-C5B6-44FB-B5BA-2A83A074C23F/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
"runId": "c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
"debugRunId": null,
"runGroupId": "c619e850-xxxx-xxxx-8d0c-4b73dcacb9d8",
"pipelineName": "partiion_parquets",
"parameters": {},
"invokedBy": {
"id": "216e5917354a41b983f82d8ceda53789",
"name": "Manual",
"invokedByType": "Manual"
},
"runStart": "2021-10-12T13:00:12.9095561Z",
"runEnd": "2021-10-12T13:03:17.9824334Z",
"durationInMs": 185072,
"status": "Succeeded",
"message": "",
"output": null,
"lastUpdated": "2021-10-12T13:03:17.9824334Z",
"annotations": [],
"runDimension": {},
"isLatest": true
}
],
"ADFWebActivityResponseHeaders": {
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"x-ms-ratelimit-remaining-subscription-reads": "11999",
"x-ms-request-id": "4373f002-b352-4824-857b-15f5325241ae",
"x-ms-correlation-request-id": "4373f002-b352-4824-857b-15f5325241ae",
"x-ms-routing-request-id": "CENTRALUS:20211012T131047Z:4373f002-b352-4824-857b-15f5325241ae",
"Cache-Control": "no-cache",
"Date": "Tue, 12 Oct 2021 13:10:46 GMT",
"Server": "Microsoft-IIS/10.0",
"X-Powered-By": "ASP.NET",
"Content-Length": "717",
"Content-Type": "application/json; charset=utf-8",
"Expires": "-1"
},
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"executionDuration": 0,
"durationInQueue": {
"integrationRuntimeQueue": 0
},
"billingReference": {
"activityType": "ExternalActivity",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.016666666666666666,
"unit": "Hours"
}
]
}
}
接下来例如,使用先前输出的管道 运行 ID,调用 api 以获取数据流详细信息。
https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelineruns/{runId}/queryActivityruns?api-version=2018-06-01
Output
{
"value": [
{
"activityRunEnd": "2021-10-12T13:03:17.1874105Z",
"activityName": "Data flow1",
"activityRunStart": "2021-10-12T13:00:14.7523815Z",
"activityType": "ExecuteDataFlow",
"durationInMs": 182435,
"retryAttempt": null,
"error": {
"errorCode": "",
"message": "",
"failureType": "",
"target": "Data flow1",
"details": ""
},
"activityRunId": "5e7aece7-5fb7-4ffd-939e-d01d3fcb494a",
"iterationHash": "",
"input": {
"dataflow": {
"referenceName": "generateParquet",
"type": "DataFlowReference",
"parameters": {},
"datasetParameters": {
"source1": {},
"sink1": {}
}
},
"staging": {},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"dataFlowETag": "48019f9b-0000-0300-0000-616475e20000"
},
"linkedServiceName": "",
"output": {
"runStatus": {
"computeAcquisitionDuration": 107335,
"version": "20211011.2",
"profile": {
"source1": {
"computed": [],
"lineage": {},
"dropped": 0,
"drifted": 0,
"newer": 16,
"total": 16,
"updated": 0
},
"sink1": {
"computed": [],
"lineage": {
"CompanyName": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"CompanyName"
]
}
]
},
"LastName": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"LastName"
]
}
]
},
"MiddleName": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"MiddleName"
]
}
]
},
"PasswordSalt": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"PasswordSalt"
]
}
]
},
"ModifiedDate": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"ModifiedDate"
]
}
]
},
"path": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"path"
]
}
]
},
"rowguid": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"rowguid"
]
}
]
},
"Title": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"Title"
]
}
]
},
"NameStyle": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"NameStyle"
]
}
]
},
"CustomerID": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"CustomerID"
]
}
]
},
"FirstName": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"FirstName"
]
}
]
},
"Suffix": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"Suffix"
]
}
]
},
"Phone": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"Phone"
]
}
]
},
"PasswordHash": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"PasswordHash"
]
}
]
},
"SalesPerson": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"SalesPerson"
]
}
]
},
"EmailAddress": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"EmailAddress"
]
}
]
}
},
"dropped": 0,
"drifted": 0,
"newer": 0,
"total": 16,
"updated": 16
}
},
"metrics": {
"sink1": {
"format": "parquet",
"stages": [
{
"stage": 1,
"partitionTimes": [
525
],
"lastUpdateTime": "2021-10-12 13:02:50.453",
"bytesWritten": 0,
"bytesRead": 199491,
"partitionStatus": "Success",
"streams": {
"source1": {
"count": 847,
"cached": false,
"totalPartitions": 1,
"partitionStatus": "Success",
"partitionCounts": [
847
],
"type": "source"
}
},
"target": "sink1",
"time": 640,
"progressState": "Completed"
},
{
"stage": 2,
"partitionTimes": [
3436,
3373,
3441,
3475,
2304,
2321,
2364,
2291,
1989,
2435
],
"lastUpdateTime": "2021-10-12 13:02:58.897",
"bytesWritten": 177728,
"bytesRead": 0,
"partitionStatus": "Success",
"streams": {
"sink1": {
"count": 847,
"cached": false,
"totalPartitions": 10,
"partitionStatus": "Success",
"partitionCounts": [
84,
85,
85,
85,
85,
85,
85,
85,
84,
84
],
"type": "sink"
}
},
"target": "sink1",
"time": 8452,
"progressState": "Completed"
}
],
"sinkPostProcessingTime": 0,
"data": [],
"store": "blob",
"rowsWritten": 847,
"details": {
"postCommandsDuration": [
3
],
"preCommandsDuration": [
1
]
},
"sources": {
"source1": {
"rowsRead": 847,
"store": "blob",
"details": {
"fileSystemInitDuration": [
7751
]
},
"format": "delimited"
}
},
"sinkProcessingTime": 12380
}
},
"dsl": "\nsource() ~> source1\n\nsource1 sink() ~> sink1"
},
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"billingReference": {
"activityType": "executedataflow",
"billableDuration": [
{
"meterType": "General",
"duration": 0.3691165737777778,
"unit": "coreHour",
"sessionType": "JobCluster"
}
]
},
"reportLineageToPurview": {
"status": "NotReported"
}
},
"userProperties": {},
"pipelineName": "partiion_parquets",
"pipelineRunId": "c619e850-c7de-4d1f-8d0c-4b73dcacb9d8",
"status": "Succeeded",
"recoveryStatus": "None",
"integrationRuntimeNames": [
"defaultintegrationruntime"
],
"executionDetails": {
"integrationRuntime": [
{
"name": "AutoResolveIntegrationRuntime",
"type": "Managed",
"computeType": "General",
"coreCount": "8"
}
]
},
"id": "/SUBSCRIPTIONS/B83C1EE3-xxxx-xxxx-B5BA-2E83A074C23K/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8/activityruns/5e7aece7-xxxx-4ffd-939e-d01d3fcb494a"
}
],
"ADFWebActivityResponseHeaders": {
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"x-ms-ratelimit-remaining-subscription-reads": "11999",
"x-ms-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
"x-ms-correlation-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
"x-ms-routing-request-id": "CENTRALUS:20211012T131050Z:e8fa08f2-594a-4464-b43e-d71104c1950c",
"Cache-Control": "no-cache",
"Date": "Tue, 12 Oct 2021 13:10:49 GMT",
"Server": "Microsoft-IIS/10.0",
"X-Powered-By": "ASP.NET",
"Content-Length": "4307",
"Content-Type": "application/json; charset=utf-8",
"Expires": "-1"
},
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"executionDuration": 0,
"durationInQueue": {
"integrationRuntimeQueue": 0
},
"billingReference": {
"activityType": "ExternalActivity",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.016666666666666666,
"unit": "Hours"
}
]
}
}
从上面你可以筛选 "activityType": "ExecuteDataFlow"
, durationInMs
, activityRunStart
, activityRunEnd
, invokedByType
, input
, output
和其他指标。
只需按照 Azure Data Factory version 2 (V2) REST API operations,参数化所选输出并在必要时存储在变量中。使用 Add dynamic content
构建 URL 和 JSON 适当地请求正文。
我有一个任务需要创建一个仪表板,该仪表板将提供 azure datafactory 中 运行 管道的详细信息。为此,我想将数据加载到 SQL。目的是测试任何数据流的性能。管道将是一个即插即用模型,我们只需拖放要测试的数据流,建立连接并在每个管道 运行,从管道中获取以下数据。以下是我要填写的列名:
**Sr No:** Auto Increment, Primary key
**pipeline ID:** <ID of pipeline run>
**Dataflow Start Time:** ddmmyy HH:SS
**Dataflow end time:** ddmmyy HH:SS
**Duration to acquire compute:** In seconds
**Data input:** Size of data read from source
**Data output:** Size of data written to sink
**Run type:** Triggered / Manual(Debug) run
**DataFlow Status:** Failed / Success
我可以查看元数据 blade 以获取源的元数据。但是我似乎无法找到资源来获取其他信息。如果您知道如何在 Azure 数据工厂中创建这样的管道,请提供帮助。
您可以使用 WebActivity
调用 API 并获取详细信息,稍后将其复制到 SQL 接收器。
这是一个例子:
https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/queryPipelineRuns?api-version=2018-06-01
替换占位符中的实际值。
Output
{
"value": [
{
"id": "/SUBSCRIPTIONS/B83C1EE3-C5B6-44FB-B5BA-2A83A074C23F/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
"runId": "c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
"debugRunId": null,
"runGroupId": "c619e850-xxxx-xxxx-8d0c-4b73dcacb9d8",
"pipelineName": "partiion_parquets",
"parameters": {},
"invokedBy": {
"id": "216e5917354a41b983f82d8ceda53789",
"name": "Manual",
"invokedByType": "Manual"
},
"runStart": "2021-10-12T13:00:12.9095561Z",
"runEnd": "2021-10-12T13:03:17.9824334Z",
"durationInMs": 185072,
"status": "Succeeded",
"message": "",
"output": null,
"lastUpdated": "2021-10-12T13:03:17.9824334Z",
"annotations": [],
"runDimension": {},
"isLatest": true
}
],
"ADFWebActivityResponseHeaders": {
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"x-ms-ratelimit-remaining-subscription-reads": "11999",
"x-ms-request-id": "4373f002-b352-4824-857b-15f5325241ae",
"x-ms-correlation-request-id": "4373f002-b352-4824-857b-15f5325241ae",
"x-ms-routing-request-id": "CENTRALUS:20211012T131047Z:4373f002-b352-4824-857b-15f5325241ae",
"Cache-Control": "no-cache",
"Date": "Tue, 12 Oct 2021 13:10:46 GMT",
"Server": "Microsoft-IIS/10.0",
"X-Powered-By": "ASP.NET",
"Content-Length": "717",
"Content-Type": "application/json; charset=utf-8",
"Expires": "-1"
},
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"executionDuration": 0,
"durationInQueue": {
"integrationRuntimeQueue": 0
},
"billingReference": {
"activityType": "ExternalActivity",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.016666666666666666,
"unit": "Hours"
}
]
}
}
接下来例如,使用先前输出的管道 运行 ID,调用 api 以获取数据流详细信息。
https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelineruns/{runId}/queryActivityruns?api-version=2018-06-01
Output
{
"value": [
{
"activityRunEnd": "2021-10-12T13:03:17.1874105Z",
"activityName": "Data flow1",
"activityRunStart": "2021-10-12T13:00:14.7523815Z",
"activityType": "ExecuteDataFlow",
"durationInMs": 182435,
"retryAttempt": null,
"error": {
"errorCode": "",
"message": "",
"failureType": "",
"target": "Data flow1",
"details": ""
},
"activityRunId": "5e7aece7-5fb7-4ffd-939e-d01d3fcb494a",
"iterationHash": "",
"input": {
"dataflow": {
"referenceName": "generateParquet",
"type": "DataFlowReference",
"parameters": {},
"datasetParameters": {
"source1": {},
"sink1": {}
}
},
"staging": {},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"dataFlowETag": "48019f9b-0000-0300-0000-616475e20000"
},
"linkedServiceName": "",
"output": {
"runStatus": {
"computeAcquisitionDuration": 107335,
"version": "20211011.2",
"profile": {
"source1": {
"computed": [],
"lineage": {},
"dropped": 0,
"drifted": 0,
"newer": 16,
"total": 16,
"updated": 0
},
"sink1": {
"computed": [],
"lineage": {
"CompanyName": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"CompanyName"
]
}
]
},
"LastName": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"LastName"
]
}
]
},
"MiddleName": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"MiddleName"
]
}
]
},
"PasswordSalt": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"PasswordSalt"
]
}
]
},
"ModifiedDate": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"ModifiedDate"
]
}
]
},
"path": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"path"
]
}
]
},
"rowguid": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"rowguid"
]
}
]
},
"Title": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"Title"
]
}
]
},
"NameStyle": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"NameStyle"
]
}
]
},
"CustomerID": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"CustomerID"
]
}
]
},
"FirstName": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"FirstName"
]
}
]
},
"Suffix": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"Suffix"
]
}
]
},
"Phone": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"Phone"
]
}
]
},
"PasswordHash": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"PasswordHash"
]
}
]
},
"SalesPerson": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"SalesPerson"
]
}
]
},
"EmailAddress": {
"mapped": true,
"from": [
{
"source": "source1",
"columns": [
"EmailAddress"
]
}
]
}
},
"dropped": 0,
"drifted": 0,
"newer": 0,
"total": 16,
"updated": 16
}
},
"metrics": {
"sink1": {
"format": "parquet",
"stages": [
{
"stage": 1,
"partitionTimes": [
525
],
"lastUpdateTime": "2021-10-12 13:02:50.453",
"bytesWritten": 0,
"bytesRead": 199491,
"partitionStatus": "Success",
"streams": {
"source1": {
"count": 847,
"cached": false,
"totalPartitions": 1,
"partitionStatus": "Success",
"partitionCounts": [
847
],
"type": "source"
}
},
"target": "sink1",
"time": 640,
"progressState": "Completed"
},
{
"stage": 2,
"partitionTimes": [
3436,
3373,
3441,
3475,
2304,
2321,
2364,
2291,
1989,
2435
],
"lastUpdateTime": "2021-10-12 13:02:58.897",
"bytesWritten": 177728,
"bytesRead": 0,
"partitionStatus": "Success",
"streams": {
"sink1": {
"count": 847,
"cached": false,
"totalPartitions": 10,
"partitionStatus": "Success",
"partitionCounts": [
84,
85,
85,
85,
85,
85,
85,
85,
84,
84
],
"type": "sink"
}
},
"target": "sink1",
"time": 8452,
"progressState": "Completed"
}
],
"sinkPostProcessingTime": 0,
"data": [],
"store": "blob",
"rowsWritten": 847,
"details": {
"postCommandsDuration": [
3
],
"preCommandsDuration": [
1
]
},
"sources": {
"source1": {
"rowsRead": 847,
"store": "blob",
"details": {
"fileSystemInitDuration": [
7751
]
},
"format": "delimited"
}
},
"sinkProcessingTime": 12380
}
},
"dsl": "\nsource() ~> source1\n\nsource1 sink() ~> sink1"
},
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"billingReference": {
"activityType": "executedataflow",
"billableDuration": [
{
"meterType": "General",
"duration": 0.3691165737777778,
"unit": "coreHour",
"sessionType": "JobCluster"
}
]
},
"reportLineageToPurview": {
"status": "NotReported"
}
},
"userProperties": {},
"pipelineName": "partiion_parquets",
"pipelineRunId": "c619e850-c7de-4d1f-8d0c-4b73dcacb9d8",
"status": "Succeeded",
"recoveryStatus": "None",
"integrationRuntimeNames": [
"defaultintegrationruntime"
],
"executionDetails": {
"integrationRuntime": [
{
"name": "AutoResolveIntegrationRuntime",
"type": "Managed",
"computeType": "General",
"coreCount": "8"
}
]
},
"id": "/SUBSCRIPTIONS/B83C1EE3-xxxx-xxxx-B5BA-2E83A074C23K/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8/activityruns/5e7aece7-xxxx-4ffd-939e-d01d3fcb494a"
}
],
"ADFWebActivityResponseHeaders": {
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"x-ms-ratelimit-remaining-subscription-reads": "11999",
"x-ms-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
"x-ms-correlation-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
"x-ms-routing-request-id": "CENTRALUS:20211012T131050Z:e8fa08f2-594a-4464-b43e-d71104c1950c",
"Cache-Control": "no-cache",
"Date": "Tue, 12 Oct 2021 13:10:49 GMT",
"Server": "Microsoft-IIS/10.0",
"X-Powered-By": "ASP.NET",
"Content-Length": "4307",
"Content-Type": "application/json; charset=utf-8",
"Expires": "-1"
},
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"executionDuration": 0,
"durationInQueue": {
"integrationRuntimeQueue": 0
},
"billingReference": {
"activityType": "ExternalActivity",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.016666666666666666,
"unit": "Hours"
}
]
}
}
从上面你可以筛选 "activityType": "ExecuteDataFlow"
, durationInMs
, activityRunStart
, activityRunEnd
, invokedByType
, input
, output
和其他指标。
只需按照 Azure Data Factory version 2 (V2) REST API operations,参数化所选输出并在必要时存储在变量中。使用 Add dynamic content
构建 URL 和 JSON 适当地请求正文。