如何为 Azure 数据工厂 v2 中的运行手册执行配置 Webhook activity?

How to configure Webhook activity for runbooks execution in Azure Data Factory v2?

我正在研究来自 ADF 的 运行 运行手册(powershell 和图形)。我发现完成此任务的方法之一是使用 webhook。我将并行和串联运行手册 运行(如果对以前的运行手册存在依赖性)。

总体而言,

经过一些研究,我找到了一些关于需要在 webhook 主体中添加(或以某种方式生成)的回调 uri。我怎样才能得到这个回调uri?如果我没有添加正确的回调 uri,那么 activity 会一直运行到超时。我相信当 runbook 运行 成功执行时,webhook activity 的功能应该完成,这样我们就可以继续管道中的下一个 webhook activity。我也试过 web activity 但这是同样的问题。

我现在用的本体就在json下面。

{"body":{"myMessage":"Sample"}}

我参考了:

https://vanishedgradient.com/2019/04/25/webhooks-with-azure-data-factory/

https://mrpaulandrew.com/2019/06/18/azure-data-factory-web-hook-vs-web-activity/

https://social.msdn.microsoft.com/Forums/en-US/2effcefb-e65b-4d5c-8b01-138c95126b79/in-azure-data-factory-v2-how-to-process-azure-analysis-service-cube?forum=AzureDataFactory

我不确定这是否是最佳做法,但我有一些在 Powershell Workflow Runbook 中工作的东西。

如果 runbook 定义了 webhook,则使用 webhookdata 参数。您的请求正文需要采用 JSON 格式,并且 $WebhookData 参数会选择它。例如假设你的 webhook activity 中的 Body 看起来像这样:

{"MyParam":1, "MyOtherParam":"Hello"}

在您的 Runbook 中,您以这种方式获取参数:

Param([object]$WebhookData)

if($WebhookData){
    $parameters=(ConvertFrom-Json -InputObject $WebhookData.RequestBody)
    if($parameters.MyParam) {$ParamOne = $parameters.MyParam} 
    if($parameters.MyOtherParam) {$ParamTwo = $parameters.MyOtherParam} 
}

运行手册 $ParamOne 和 $ParamTwo 中的变量是从解析的 JSON 正文字符串中填充的。数据工厂自动将 callBackUri 附加到 Body 字符串。您不需要创建它。

您必须使用 $WebhookData 名称。这是一个定义 属性。

希望对您有所帮助。

感谢您提供链接,它们是有用的资源。我已经设法让这个调用 运行book 来调整 azure 分析服务大小的管道工作。 运行这本书 return 失败和成功信息没有很好的记录。

这里有一些代码可以提供一些帮助,我从几个地方获取了这些代码,但很多来自未解决的问题 (https://github.com/MicrosoftDocs/azure-docs/issues/43897) on this Microsoft page: https://docs.microsoft.com/en-us/azure/data-factory/control-flow-webhook-activity

datafactory Webhook activity 传入一些 "Headers",SourceHost 即@pipeline().DataFactory 和 SourceProcess 即@pipeline().Pipeline。这是为了让我们可以进行一些检查,以确认 运行 这本书正在 运行 通过可接受的流程。

调用的Body就是我们需要的其他变量:

@json(concat('{"AnalysisServer":"', pipeline().parameters.AASName, '", "MinimumSKU":"', pipeline().parameters.SKU,'"}') )

您的 运行图书需要 WebhookData 参数

param
(
    [Parameter (Mandatory=$false)]
    [object] $WebhookData
)

然后您可以获取所需的所有位,包括检查是否提供了回调:

if ($WebhookData)
{

    # Split apart the WebhookData
    $WebhookName     =     $WebhookData.WebhookName
    $WebhookHeaders  =     $WebhookData.RequestHeader
    $WebhookBody     =     $WebhookData.RequestBody | Convertfrom-Json
    $WebhookADF      =     $WebhookHeaders.SourceHost
    $WebhookPipeline =     $WebhookHeaders.SourceProcess

Write-Output -InputObject ('Runbook started through webhook {0} called by {1} on {2}.' -f $WebhookName, $WebhookPipeline, $WebhookADF)

# if there's a callBackURI then we've been called by something that is waiting for a response
If ($WebhookBody.callBackUri)
{
    $WebhookCallbackURI =  $WebhookBody.callBackUri
}

...
}

变量 $WebHookHeaders: @{Connection=Keep-Alive; Expect=100-continue; Host=sXXevents.azure-automation.net; SourceHost=**MYDATAFACTORYNAME**; SourceProcess=**MYPIPELINENAME**; x-ms-request-id=**UNIQUEIDENTIFIER**}

然后您可以从 json body 中获取信息:$AzureAnalysisServerName = $WebHookBody.AnalysisServer

将 error/failure 传回您的 运行 书籍相对容易,请注意,我将 success/update 消息放入 $Message 并且只有 $ErrorMessage 中有内容一个错误:

$ErrorMessage = "Failed to do stuff I wanted" 

if ($ErrorMessage) 
{

    $Output = [ordered]@{ output= @{
        AzureAnalysisServerResize = "Failed" }
        error = @{
            ErrorCode = "ResizeError"
            Message = $ErrorMessage
        }
        statusCode = "500"
    }
} else {
    $Output = [ordered]@{ 
        output= @{ 
            "AzureAnalysisServerResize" = "Success"
            "message" = $Outputmessage 
        }
        statusCode = "200"
    }
}

$OutputJson = $Output | ConvertTo-Json -Depth 10

# if we have a callbackuri let the ADF Webhook activity know that the script is complete 
# Otherwise it waits until its timeout
If ($WebhookCallBackURI)
{
    $WebhookCallbackHeaders = @{
        "Content-Type"="application/json"
    }
    Invoke-WebRequest -UseBasicParsing -Uri $WebhookCallBackURI -Method Post -Body $OutputJson -Header $WebhookCallbackHeaders
}

然后我结束 if ($WebhookData) { 调用 else 说如果不是从 webhook 调用 运行book 不应该 运行ning:

} else {
    Write-Error -Message 'Runbook was not started from Webhook' -ErrorAction stop
}

传回错误消息很容易,传回成功消息却很痛苦,但上面的方法似乎有效,并且在我的数据工厂管道中我可以访问结果。

Output
{
    "message": "Analysis Server MYSERVERNAME which is SKU XX is already at or above required SKU XX.",
    "AzureAnalysisServerResize": "Success"
}

请注意,对于 Invoke-WebRequest,一些在线示例未指定 -UseBasicParsing 但我们不得不指定 运行book 抱怨:Invoke-WebRequest:无法解析响应内容因为 Internet Explorer 引擎不可用,或者 Internet Explorer 的 first-launch 配置不完整。

抱歉延迟。几个月前我找到了完整的解决方案。感谢 Nick 和 Sara 添加这些片段。我使用了与 return 代码类似的代码。我们使用的是允许有限更改的图形运行手册,因此我只是在运行手册的末尾添加了 return 代码 (Powershell),几乎没有影响。我插入了以下代码:

if ($WebhookData)
{
    Write-Output $WebhookData
    $parameters = (ConvertFrom-Json -InputObject $WebhookData.RequestBody)
    if ($parameters.callBackUri)
    {
        $callbackuri = $parameters.callBackUri
    }
}


if ($callbackuri)
{
    Invoke-WebRequest -Uri $callbackuri -UseBasicParsing -Method POST
}

Write-Output $callbackuri

在此之后,我使用 Runbook 中提供的 "Input and Output" 按钮添加了一个输入参数。我将输入参数命名为 "WebhookData" 并键入 "Object"。输入参数的名称区分大小写,并应与 Powershell 代码中使用的参数匹配。

这解决了我的问题。 Runbook 在从 ADF 管道调用时启动,仅当 webhook 调用的基础 Runbook 完成时才移动到下一个管道。