流分析:为自动暂停一天的 TUMBLING WINDOW 流作业选择的最佳参数以及为该功能设置的最佳触发时间

Stream Analytics: Best parameters to choose for the autopause of a day wise TUMBLINGWINDOW stream job and best trigger time to set for that function

上下文

我有一个白天的 TUMBLINGWINDOW(类似于下图)

SELECT
    DATEADD(day, -1, System.Timestamp()) AS WindowStart
    System.Timestamp() AS WindowEnd, 
    TollId, 
    COUNT(*)
FROM Input TIMESTAMP BY EntryTime  
GROUP BY TumblingWindow(day, 1), TollId

我一直在阅读 autopause documentation 并开始按照其中包含的步骤进行操作。我有一个测试流作业以及一个可以托管自动暂停 PowerShell 代码所有设置的函数应用程序,这样我就可以在不影响实际作业的情况下进行操作,因为我现在使用的是单独的测试作业)。 PowerShell 代码保持原样(除参数值外没有变化)但是我还没有真正开始测试流作业,我正计划一旦我对用于自动暂停的参数和触发时间有了更多的线索,就这样做。

这是以前的 Whosebug post,它为理解目的以及我想要实现的目标提供了额外的有用解释(我创建了 post):

Post explaining how start time works with specific examples as to how I'd want the job to pause

Summary of Background Scenario in other posts

Aim is to be able to allow the stream job to run once in a day (long enough to allow the full days TUMBLINGWINDOW output to come out each day with the full days worth of data). To ensure enough time is given for this purpose I was, thinking that the job can remain off majority of the day (from 00:30 UTC) except for 23:30 UTC when it should turn on and "catch up with the backlog" for the day (00:00-23:30 UTC) after which the day wise window outputs at 00:00 UTC and subsequently switch off, say at 00:30 UTC (having had enough time to ensure output). This process would then repeat in a cycle

我的问题

我选择的主要参数(添加在下面)是否符合我的意图(如上文所述),如果是这样,我该如何设置 trigger time of the function app 以便此代码 运行 符合这些参数的预期?

我会在 23:3000:30 处将触发器设置为 运行 脚本吗(在文档中提到这是使用 CRON 表达式完成的)因为在这两个点它要么需要分别开始或停止工作?

# This snippet is taken from the auto-pause doc linked above

# Set my own values in minutes based on above discussion
$restartThresholdMinute = 1380 # This is M (1380min = 23hours ie time left off 00:30-23:30 UTC)
$stopThresholdMinute = 60 # This is N (60min = 1hours ie time left on 23:30-00:30 UTC)

# Have left these as default due to present advice
$maxInputBacklog = 0 # The amount of backlog we tolerate when stopping the job (in event count, 0 is a good starting point)
$maxWatermark = 10 # The amount of watermark we tolerate when stopping the job (in seconds, 10 is a good starting point at low SUs)

侧点:

如果我的参数不是一个好的开始选择,还有哪些其他建议? (牢记我在上下文部分讨论的主要限制条件

编辑:更新 2022-03-16

@Florian 根据我对您在 post 中提到的内容的理解,我有一些想法,但不确定处理此问题的最佳方法。 如果您可以在您的答案中为此实现添加对代码的改编,那会很好.

# Input bindings are passed in via param block.
Param($Timer)

# Stop on error
$ErrorActionPreference = 'stop'

# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
$currentUTCstringtime = Get-Date -Date $currentUTCtime -UFormat %R  # nishcs edit: Getting the 24hour UTC time format as a string
Write-Host "asaRobotPause - PowerShell timer trigger function is starting at time: $currentUTCtime"

# Set variables
[string]$restartTime = $env:restartTime # nishcs edit: Set this to '23:30' These can infact be hard coded (perhaps best practice to have these as set variables in function app settings, not sure.)
[string]$stopTime = $env:stopTime # nishcs edit: Set this to '00:30'. These can infact be hard coded (perhaps best practice to have these as set variables in function app settings, not sure.)

$maxInputBacklog = $env:maxInputBacklog
$maxWatermark = $env:maxWatermark

$restartThresholdMinute = $env:restartThresholdMinute
$stopThresholdMinute = $env:stopThresholdMinute

$subscriptionId = $env:subscriptionId
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName

$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"

# Check if managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
    Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity for additional details.")
}

try
{
    # throw "This is an error."
    
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotPause - Job $($asaJobName) is $($currentJobState)."

    # Switch state
    if ($currentJobState -eq "Running")
    { 
        # Get-AzActivityLog issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
        # We check in 1000 record of history, to make sure we're not missing what we're looking for. It may need adjustment for a job that has a lot of logging happening.
        # There is a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). We move it down.
        $startTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Start Job*"}
        $startTimeStamp = $startTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}

        # Get-AzMetric issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
        $currentBacklog = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "InputEventsSourcesBacklogged" -DetailedOutput -WarningAction Ignore
        $currentWatermark = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "OutputWatermarkDelaySeconds" -DetailedOutput -WarningAction Ignore

        # Metric are always lagging 1-3 minutes behind, so grabbing the last N minutes means checking N+3 actually. This may be overly safe and fined tune down per job.
        $Backlog =  $currentBacklog.Data | `
                        Where-Object {$_.Maximum -ge 0} | `
                        Sort-Object -Property Timestamp -Descending | `
                        Where-Object {$_.Timestamp -ge $startTimeStamp} | `
                        Select-Object -First $stopThresholdMinute | 
                        Measure-Object -Sum Maximum
        $BacklogSum = $Backlog.Sum

        $Watermark = $currentWatermark.Data | `
                        Where-Object {$_.Maximum -ge 0} | `
                        Sort-Object -Property Timestamp -Descending | `
                        Where-Object {$_.Timestamp -ge $startTimeStamp} | `
                        Select-Object -First $stopThresholdMinute | `
                        Measure-Object -Average Maximum
        $WatermarkAvg = [int]$Watermark.Average

        Write-Output "asaRobotPause - Job $($asaJobName) is running since $($startTimeStamp) with a sum of $($BacklogSum) backlogged events, and an average watermark of $($WatermarkAvg) sec, for $($Watermark.Count) minutes."

        # nishcs edit: Conditions no longer reliant on the M and N minute. Just on the predefined start/ stop time that have been set.
        if (
            ($currentUTCstringtime -eq $stopTime)
            )
        {
            Write-Output "asaRobotPause - Job $($asaJobName) is stopping..."
            Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
        }
        else {
            Write-Output "asaRobotPause - Job $($asaJobName) is not stopping yet, it needs to have less than $($maxInputBacklog) backlogged events and under $($maxWatermark) sec watermark for at least $($stopThresholdMinute) minutes."
        }
    }

    elseif ($currentJobState -eq "Stopped")
    {
        # Get-AzActivityLog issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
        # We check in 1000 record of history, to make sure we're not missing what we're looking for. It may need adjustment for a job that has a lot of logging happening.
        # There is a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). We move it down.
        $stopTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Stop Job*"}
        $stopTimeStamp = $stopTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}

        # Get-Date returns a local time, we project it to the same time zone (universal) as the result of Get-AzActivityLog that we extracted above
        $minutesSinceStopped = ((Get-Date).ToUniversalTime()- $stopTimeStamp).TotalMinutes

        # nishcs edit: Conditions no longer reliant on the M and N minute. Just on the predefined start/ stop time that have been set. 
        if ($currentUTCstringtime -eq $restartTime)
        {
            Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it is now starting..."
            Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
        }
        else{
            Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it will not be restarted yet."
        }
    }
    else {
        Write-Output "asaRobotPause - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
    }

    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotPause - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}

我认为您需要一种不同于 the article 中描述的调度逻辑。

来自文章:

  • 停止的作业在 M 分钟后重新启动
  • 运行 作业在 N 分钟后随时停止,只要其积压和水印指标正常

我认为你需要:

  • 已停止的作业在 23:30
  • 重新启动
  • 运行 作业在 00:30 停止(您仍然可以检查水印,但如果您给它足够的时间,这可能是不必要的)

实现用例的最简单方法是创建 2 个简单作业,一个用于启动,一个用于停止。在触发器方面:

  • Azure 函数你需要一些东西like
    • 停在 0 30 23 * * *
    • 开始于 0 30 0 * * *
  • Azure 自动化 schedule trigger

如果您需要帮助调整代码,请告诉我。

为了问题的完整性发帖于此。我提供了修改后的脚本来处理在特定时间停止和启动。

这是根据 @Florian 的建议得出的。

方法一:函数应用法

如果您计划使用函数应用来托管代码,您可以在单个函数应用中创建 2 个单独的函数。一个用于停止,一个用于重新启动流作业。 下面我附上了每个函数的 PowerShell 代码 (运行.ps1)。 可以将函数的参数添加到函数应用程序的配置部分,并使用环境变量语法将其拉入此处的脚本。

函数 1(重启作业):asa-autorestart

<# 
Function for restarting the stream job.
This uses the when last stopped logic to try and ensure no data is missed during the restart process, this can be changed as necessary.
#>

# Input bindings are passed in via param block.
Param($Timer)

# Stop on error
$ErrorActionPreference = 'stop'

# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
Write-Host "asaRobotRestart - PowerShell timer trigger function is starting at time: $currentUTCtime"

# Set variables
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName

# Not being used in code but kept just encase
$subscriptionId = $env:subscriptionId
#$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"

# Check if managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
    Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity for additional details.")
}

try
{
    # throw "This is an error."
    
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotRestart - Job $($asaJobName) is $($currentJobState)."

    if ($currentJobState -eq "Stopped")
    {   
        # Conditions no longer reliant on the M and N minute. Just on the predefined restart trigger time that has been set.
        Write-Output "asaRobotRestart - Job $($jobName) is now starting from when last stopped..."
        Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
        Write-Output "asaRobotRestart - Job $($jobName) has been started."
    }
    else {
        Write-Output "asaRobotRestart - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
    }
    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotRestart - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}

函数 2(停止作业):asa-autostop

<# 
Function for stopping the stream job.
#>

# Input bindings are passed in via param block.
Param($Timer)

# Stop on error
$ErrorActionPreference = 'stop'

# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
Write-Host "asaRobotStop - PowerShell timer trigger function is starting at time: $currentUTCtime"

# Set variables
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName

# Not being used in code but kept just encase
$subscriptionId = $env:subscriptionId
#$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"

# Check if managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
    Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity for additional details.")
}

try
{
    # throw "This is an error."
    
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotStop - Job $($asaJobName) is $($currentJobState)."

    # Switch state
    if ($currentJobState -eq "Running")
    { 
        # Conditions no longer reliant on the M and N minute. Just on the predefined stop trigger time that has been set.
        Write-Output "asaRobotStop - Job $($asaJobName) is stopping..."
        Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
        Write-Output "asaRobotStop - Job $($asaJobName) has stopped."

    }
    else {
            Write-Output "asaRobotStop - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
        }
    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotStop - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}

方法二:自动化作业方法

如果您计划使用自动化帐户来托管代码,您可以在自动化帐户中创建 2 个单独的 运行书籍。一个用于停止,一个用于重新启动流作业。 下面我附上了每个 Runbooks 的 PowerShell 代码。 运行 图书的参数可以在 运行 图书出版后添加,并且您要在特定时间将每本书安排到 运行。然后可以使用标准参数语法将其拉入脚本。

Runbook 1(重新启动作业):asa-autorestart

#Re-starting job
Param(
    [string]$subscriptionId,
    [string]$resourceGroupName,
    [string]$asaJobName
)
# Stop on error
$ErrorActionPreference = 'stop'
# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
Write-Host "asaRobotRestart - PowerShell timer trigger function is starting at time: $currentUTCtime"
# Set variables
$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"
# Ensures you do not inherit an AzContext in your runbook
Disable-AzContextAutosave -Scope Process | Out-Null
# Connect using a Managed Service Identity
try {
        $AzureContext = (Connect-AzAccount -Identity).context
    }
catch{
        Write-Output "There is no system-assigned user identity. Aborting.";
        exit
    }
try
{
    # throw "This is an error."
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotRestart - Job $($asaJobName) is $($currentJobState)."
    if ($currentJobState -eq "Stopped")
    {
        # Conditions no longer reliant on the M and N minute. Just on the predefined restart trigger time that has been set.
        Write-Output "asaRobotRestart - Job $($jobName) is now starting from when last stopped..."
        Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
        Write-Output "asaRobotRestart - Job $($jobName) has been started."
    }
    else {
        Write-Output "asaRobotRestart - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
    }
    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotRestart - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}

Runbook 2(停止作业):asa-autostop

# Stopping job
Param(
    [string]$subscriptionId,
    [string]$resourceGroupName,
    [string]$asaJobName
)
# Stop on error
$ErrorActionPreference = 'stop'
# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
Write-Host "asaRobotStop - PowerShell timer trigger function is starting at time: $currentUTCtime"
# Set variables
$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"
# Ensures you do not inherit an AzContext in your runbook
Disable-AzContextAutosave -Scope Process | Out-Null
# Connect using a Managed Service Identity
try {
        $AzureContext = (Connect-AzAccount -Identity).context
    }
catch{
        Write-Output "There is no system-assigned user identity. Aborting.";
        exit
    }
try
{
    # throw "This is an error."
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotStop - Job $($asaJobName) is $($currentJobState)."
    # Switch state
    if ($currentJobState -eq "Running")
    {
        # Conditions no longer reliant on the M and N minute. Just on the predefined stop trigger time that has been set.
        Write-Output "asaRobotStop - Job $($asaJobName) is stopping..."
        Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
        Write-Output "asaRobotStop - Job $($asaJobName) has stopped."
    }
    else {
            Write-Output "asaRobotStop - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
        }
    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotStop - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}