在通过管道之前对数据进行批处理

Batching data before passing through the pipeline

我有一堆文件共享,上面有数百万个 files/folders。我正在使用 gci -Recurse 获取共享中 directories/files 的完整列表,我需要从该 gci 加载几条信息到 SQL 服务器以进行额外分析。我用来获取数据的命令是:

gci $SharePath -Recurse | select FullName, Attributes, Length, CreationTimeUtc, LastAccessTimeUtc, LasWriteTimeUtc

现在我可以使用推荐的语法将其通过管道传输到 Write-SQLTableData,以按照 Microsoft's Write-SqlTableData documentation page 的选项 3 中的建议强制批量插入,如下所示:

$Params = @{
    ServerInstance = 'sqlservername'
    DatabaseName = 'databasename'
    SchemaName = 'dbo'
}
,(gci $SharePath -Recurse | select FullName, Attributes, Length, CreationTimeUtc, LastAccessTimeUtc, LasWriteTimeUtc) | Write-SqlTableData @Params -TableName 'Table1'

然而,这样做的结果是 gci 需要几个小时才能完成,没有任何反馈,并且会占用许多 GB 的内存,并且在最终将所有数据转储到 SQL.如果我离开 ,( 和匹配的 ),数据会在生成时移动到 SQL,但是 SQL 服务器会受到数百万个单独插入的冲击。

我正在寻找的是使用管道的中间答案。我知道我可以将 gci 结果存储在变量 $gciresults 中,然后使用 $gciresults[0..999] 一次将 1000 行传递给 SQL 等等,但我正在尝试利用管道,所以我不要占用太多内存。理想情况下,会有一些我将调用 batching-cmdlet 的 cmdlet,它允许我将传入的数据拆分为一口大小的块,而无需先将其全部存储在内存中,如下所示:

gci ... | select FullName, ... | batching-cmdlet -batchsize 1000 | Write-SqlTableData @Params -TableName 'Table1'

搜索此类 cmdlet 失败。有没有人想过我如何完成这个?

从 PowerShell 7.0 开始,很遗憾,没有批处理(分区)机制。

  • Select-Object 加一已在 this GitHub issue 中提出。
  • 提供自定义实现,作为函数 Select-Chunk.

因此,您现在必须自己实施批处理:

# Create an aux. queue for batching the objects.
$batchSize = 1000
$batch = [System.Collections.Generic.Queue[pscustomobject]]::new($batchSize)

Get-ChildItem $SharePath -Recurse | 
  Select-Object FullName, Attributes, Length, CreationTimeUtc, LastAccessTimeUtc, LasWriteTimeUtc |
    ForEach-Object { 
      $batch.Enqueue($_) # add object to the batch
      if ($batch.Count -eq $batchSize) { # batch is full, write to table.
        # send batch as a whole through the pipeline
        , $batch | Write-SqlTableData @Params -TableName Table1
        $batch.Clear() # start next batch
      }
    }

# Process any remaining items.
if ($batch.Count) {
  , $batch | Write-SqlTableData @Params -TableName Table1
}

使用@mklement0 在他接受的答案中概述的框架,我编写了以下 Split-PipelineData cmdlet,它接受管道输入并以用户可定义的批次将其向下传递。请注意,事实证明这与@mklement0 链接的 post 中的函数非常相似,但是我还添加了使用 write-progress.

报告进度的功能
<#
.Synopsis
    Takes pipeline objects one at a time and sends them on in batches.
.DESCRIPTION
    Takes pipeline objects one at a time and sends them on in batches.  Allows user selectable values for
    batch size and feedback options.
#>
Function Split-PipelineData
{
    [CmdletBinding(DefaultParameterSetName='Default')]
    Param
    (
        # PipelineData
        [Alias('PipelineData')]
        [Parameter(ParameterSetName='Default',Mandatory=$true,ValueFromPipeline=$true,Position=0)]
        [Parameter(ParameterSetName='Progress',Mandatory=$true,ValueFromPipeline=$true,Position=0)]
        $InputObject,

        # Batch size for sending on to the pipeline
        [Parameter(ParameterSetName='Default',Mandatory=$false)]
        [Parameter(ParameterSetName='Progress',Mandatory=$false)]
        [int]$BatchSize=1000,

        # If set, Progress will use Write-Progress to display progress information
        [Parameter(ParameterSetName='Progress',Mandatory=$true)]
        [switch]$Progress,

        # Passthru to Write-Progress ID parameter
        [Parameter(ParameterSetName='Progress',Mandatory=$false)]
        [int]$ProgressID=0,

        # Passthru to Write-Progress ParentID parameter
        [Parameter(ParameterSetName='Progress',Mandatory=$false)]
        [int]$ProgressParentID=-1,

        # Passthru to Write-Progress Activity parameter. Default is 'Batching pipeline data'.
        [Parameter(ParameterSetName='Progress',Mandatory=$false)]
        [int]$ProgressActivity=$null,

        # Report progress after this many records.  Defaults to same as BatchSize
        [Parameter(ParameterSetName='Progress',Mandatory=$false)]
        [int]$ProgressBatchSize=$null,

        # Total Record count (if known) to be used in progress
        [Parameter(ParameterSetName='Progress',Mandatory=$false)]
        [int]$TotalRecords=$null
    )

    Begin
    {
        $Batch = [System.Collections.Generic.Queue[pscustomobject]]::new($BatchSize)
        [int64]$RecordCounter = 0
        If ($Progress)
        {
            $ProgressParams = @{
                Activity = If ($ProgressActivity) {$ProgressActivity} Else {'Batching pipeline data'}
                Status = ''
                ID = $ProgressID
                ParentID = $ProgressParentID
                PercentComplete = -1
            }
            If ($ProgressBatchSize -in $null,0) {$ProgressBatchSize = $BatchSize}
        }
    }
    Process
    {
        $RecordCounter++

        #Add record to batch
        $Batch.Enqueue($_)

        #Report progress if necessary
        If ($Progress -and $RecordCounter % $ProgressBatchSize-eq 0)
        {
            If ($TotalRecords)
            {
                $ProgressParams.Status = "Piping record $RecordCounter/$TotalRecords"
                $ProgressParams.PercentComplete = [int](100*($RecordCounter/$TotalRecords))
            }
            Else
            {
                $ProgressParams.Status = "Piping record $RecordCounter"
            }
            Write-Progress @ProgressParams
        }

        #Pass batch on if it has reached its threshhold
        if ($Batch.Count -eq $BatchSize)
        { 
            ,($Batch)
            $Batch.Clear() # start next batch
        }
    }
    End
    {
        #Report final progress if necessary
        If ($Progress)
        {
            If ($TotalRecords)
            {
                $ProgressParams.Status = "Piping record $RecordCounter/$TotalRecords"
                $ProgressParams.PercentComplete = [int](100)
            }
            Else
            {
                $ProgressParams.Status = "Piping record $RecordCounter"
            }
            Write-Progress @ProgressParams
        }

        #Pass remaining records on and clear variable
        ,($Batch)
        $Batch.Clear()
        Remove-Variable Batch

        #Clear progress bars if necessary
        If ($Progress)
        {
            $ProgressParams.Activity = 'Completed'
            If ($ProgressParams.ContainsKey('Status')) {$ProgressParams.Remove('Status')}
            Write-Progress @ProgressParams -Completed
        }
    }
}