PowerShell,使用Start-Job和Start-Process测试异步任务的性能/效率

jv4diomz  于 2023-04-06  发布在  Shell
关注(0)|答案(3)|浏览(318)

我很想在PowerShell中使用Start-ThreadJobStart-JobStart-Process测试异步任务的性能/有用性。我有一个包含大约100个zip文件的文件夹,因此进行了以下测试:

New-Item "000" -ItemType Directory -Force   # Move the old zip files in here
foreach ($i in $zipfiles) {
    $name = $i -split ".zip"
    Start-Job -scriptblock {
        7z.exe x -o"$name" .\$name
        Move-Item $i 000\ -Force
        7z.exe a $i .\$name\*.*
    }
}

这样做的问题是,它会为所有100个zip启动作业,这可能太多了,所以我想设置一个值$numjobs,比如5,我可以改变它,这样只有$numjobs会同时启动,然后脚本将检查所有5个作业在下一个5块开始之前结束。然后,我想根据$numjobs的值来查看CPU和内存
我如何告诉一个循环只运行5次,然后等待作业完成后再继续?
我发现等待工作完成很容易

$jobs = $commands | Foreach-Object { Start-ThreadJob $_ }
$jobs | Receive-Job -Wait -AutoRemoveJobchange

但是我如何等待Start-Process任务结束呢?
虽然我想使用Parallel-ForEach,但我工作的企业将在未来3-4年内与PowerShell 5.1紧密相连,我预计没有机会安装PowerShell 7.x(尽管我很好奇自己在家里的系统上测试Parallel-ForEach以比较所有方法)。

yiytaume

yiytaume1#

ForEach-Object -ParallelStart-ThreadJob具有限制可以同时运行的线程数量的内置功能,这同样适用于Runspace及其RunspacePool,这是两个cmdlet在幕后使用的。
Start-Job不提供这样的功能,因为每个作业都在一个单独的进程中运行,而不是前面提到的在同一进程中的不同线程中运行的cmdlet。我个人也不认为它是并行的替代方案,它非常慢,在大多数情况下线性循环会比它快。在某些情况下,序列化和反序列化也可能是一个问题。

如何限制运行线程数?

这两个cmdlet都为此提供了-ThrottleLimit参数。

  • https://learn.microsoft.com/en-us/powershell/module/threadjob/start-threadjob?view=powershell-7.2#-throttlelimit
  • https://learn.microsoft.com/en-us/powershell/module/microsoft.powershell.core/foreach-object?view=powershell-7.2#-throttlelimit
代码看起来如何?
$dir = (New-Item "000" -ItemType Directory -Force).FullName

# ForEach-Object -Parallel
$zipfiles | ForEach-Object -Parallel {
    $name = [IO.Path]::GetFileNameWithoutExtension($_)
    7z.exe x -o $name .\$name
    Move-Item $_ $using:dir -Force
    7z.exe a $_ .\$name\*.*
} -ThrottleLimit 5

# Start-ThreadJob
$jobs = foreach ($i in $zipfiles) {
    Start-ThreadJob {
        $name = [IO.Path]::GetFileNameWithoutExtension($using:i)
        7z.exe x -o $name .\$name
        Move-Item $using:i $using:dir -Force
        7z.exe a $using:i .\$name\*.*
    } -ThrottleLimit 5
}
$jobs | Receive-Job -Wait -AutoRemoveJob
如何在只有PowerShell 5.1且无法安装新模块的情况下实现相同的功能?

RunspacePool提供了相同的功能,无论是通过它的.SetMaxRunspaces(Int32)方法,还是通过针对RunspaceFactory.CreateRunspacePool重载之一提供maxRunspaces限制作为参数。

代码看起来如何?
$dir   = (New-Item "000" -ItemType Directory -Force).FullName
$limit = 5
$iss   = [initialsessionstate]::CreateDefault2()
$pool  = [runspacefactory]::CreateRunspacePool(1, $limit, $iss, $Host)
$pool.ThreadOptions = [Management.Automation.Runspaces.PSThreadOptions]::ReuseThread
$pool.Open()

$tasks  = foreach ($i in $zipfiles) {
    $ps = [powershell]::Create().AddScript({
        param($path, $dir)

        $name = [IO.Path]::GetFileNameWithoutExtension($path)
        7z.exe x -o $name .\$name
        Move-Item $path $dir -Force
        7z.exe a $path .\$name\*.*
    }).AddParameters(@{ path = $i; dir = $dir })
    $ps.RunspacePool = $pool

    @{ Instance = $ps; AsyncResult = $ps.BeginInvoke() }
}

foreach($task in $tasks) {
    $task['Instance'].EndInvoke($task['AsyncResult'])
    $task['Instance'].Dispose()
}
$pool.Dispose()

请注意,对于所有示例,尚不清楚7zip代码是否正确,此答案试图演示如何在PowerShell中完成异步,而不是如何压缩文件/文件夹。

下面是一个帮助函数,它可以简化并行调用的过程,尝试模拟ForEach-Object -Parallel,并与PowerShell 5.1兼容,尽管不应该被视为一个强大的解决方案

注意This Q&A提供了一个更好和最强大的替代下面的功能。

using namespace System.Management.Automation
using namespace System.Management.Automation.Runspaces
using namespace System.Collections.Generic

function Invoke-Parallel {
    [CmdletBinding()]
    param(
        [Parameter(Mandatory, ValueFromPipeline, DontShow)]
        [object] $InputObject,

        [Parameter(Mandatory, Position = 0)]
        [scriptblock] $ScriptBlock,

        [Parameter()]
        [int] $ThrottleLimit = 5,

        [Parameter()]
        [hashtable] $ArgumentList
    )

    begin {
        $iss = [initialsessionstate]::CreateDefault2()
        if($PSBoundParameters.ContainsKey('ArgumentList')) {
            foreach($argument in $ArgumentList.GetEnumerator()) {
                $iss.Variables.Add([SessionStateVariableEntry]::new($argument.Key, $argument.Value, ''))
            }
        }
        $pool  = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit, $iss, $Host)
        $tasks = [List[hashtable]]::new()
        $pool.ThreadOptions = [PSThreadOptions]::ReuseThread
        $pool.Open()
    }
    process {
        try {
            $ps = [powershell]::Create().AddScript({
                $args[0].InvokeWithContext($null, [psvariable]::new("_", $args[1]))
            }).AddArgument($ScriptBlock.Ast.GetScriptBlock()).AddArgument($InputObject)

            $ps.RunspacePool = $pool
            $invocationInput = [PSDataCollection[object]]::new(1)
            $invocationInput.Add($InputObject)

            $tasks.Add(@{
                Instance    = $ps
                AsyncResult = $ps.BeginInvoke($invocationInput)
            })
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
    }
    end {
        try {
            foreach($task in $tasks) {
                $task['Instance'].EndInvoke($task['AsyncResult'])
                if($task['Instance'].HadErrors) {
                    $task['Instance'].Streams.Error
                }
                $task['Instance'].Dispose()
            }
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
        finally {
            if($pool) { $pool.Dispose() }
        }
    }
}

它是如何工作的一个例子:

# Hashtable Key becomes the Variable Name inside the Runspace!
$outsideVariables = @{ Message = 'Hello from {0}' }
0..10 | Invoke-Parallel {
    "[Item $_] - " + $message -f [runspace]::DefaultRunspace.InstanceId
    Start-Sleep 5
} -ArgumentList $outsideVariables -ThrottleLimit 3
2vuwiymt

2vuwiymt2#

要添加到Santiago Squarzon's helpful answer
下面是辅助函数Measure-Parallel,它允许您比较以下并行方法的速度:

  • Start-Job
  • 基于子进程:在后台创建子PowerShell进程,这使得这种方法既缓慢又占用资源。
  • Start-ThreadJob-附带 PowerShell(Core)(v6+);可通过Install-Module ThreadJob在Windows PowerShell v5.1中安装:
  • 基于线程:重量比Start-Job轻得多,同时提供相同的功能;另外避免了由于跨进程串行化/反串行化而导致的类型保真度的潜在损失。
  • ForEach-Object-Parallel-仅在PowerShell(Core)7.0+中提供:
  • 基于线程:本质上,Start-ThreadJob是一个简化的 Package 器,支持直接管道输入和直接输出,始终同步整体执行(等待所有启动的线程)。
  • Start-Process
  • 基于子进程:默认情况下异步调用 * 外部程序 *,默认情况下在Windows上的 * 新窗口 * 中。
  • 请注意,这种方法只有在您的并行任务 * 仅 * 包含对 * 外部程序*的 * 单个 * 调用时才有意义,而不是需要执行 * 一段PowerShell代码 *。
  • 值得注意的是,使用这种方法 * 捕获输出 * 的唯一方法是 * 重定向到文件*,总是 * 纯文本 *。

注意事项:

  • 假设下面的测试 Package 了一个对外部可执行文件的调用(比如您的例子中的7z.exe),那么Start-Process方法将执行得最好,因为它没有作业管理的开销。然而,如上所述,这种方法有基本的局限性。
  • 由于其复杂性,圣地亚哥的答案中基于运行空间池的方法没有包括在内;如果您可以使用Start-ThreadJobForEach-Object -Parallel,则不需要采用这种方法。

示例Measure-Parallelism调用,对比了这些方法的运行时性能:

# Run 20 jobs / processes in parallel, 5 at a time, comparing
# all approaches.
# Note: Omit the -Approach argument to enter interactive mode.
Measure-Parallel -Approach All -BatchSize 5 -JobCount 20

运行PowerShell 7.2.6的macOS机器的示例输出(时间因许多因素而异,但比率应提供相对性能的感觉):

# ... output from the jobs

JobCount                         : 20
BatchSize                        : 5
BatchCount                       : 4
Start-Job (secs.)                : 2.20
Start-ThreadJob (secs.)          : 1.17
Start-Process (secs.)            : 0.84
ForEach-Object -Parallel (secs.) : 0.94

结论:

  • ForEach-Object -Parallel增加的线程/作业管理开销最小,其次是Start-ThreadJob
  • Start-Job,由于需要一个 * 额外的 * 子进程-用于运行每个任务的隐藏PowerShell示例-明显较慢。在Windows上,性能差异似乎更加明显。

源代码:**

*重要信息
*函数 * 硬编码 * 示例输入对象以及要调用的外部程序-您必须根据需要自行编辑;硬编码的外部程序在本例中是平台原生shell(在Windows上是cmd.exe,在类Unix平台上是/bin/sh),它被传递一个命令来简单地 echo 每个输入对象。

  • 修改函数以接受脚本块作为参数并通过管道接收作业的输入对象并不太困难(尽管这将排除Start-Process方法,除非您通过PowerShell CLI显式调用该块-但在这种情况下可以使用Start-Job)。
  • 作业/过程 * 输出 * 的内容 * 直接 * 显示 *,无法捕获。
  • 批量大小默认为5,可以通过-BatchSize修改;对于基于线程的方法,批处理大小也用作-ThrottleLimit参数,即允许同时运行的线程数量的限制。默认情况下,运行 * 单个 * 批处理,但您可以通过将并行运行的总数传递给-JobCount间接请求多个批处理
  • 您可以通过数组值-Approach参数选择方法,该参数支持JobThreadJobProcessForEachParallelAll,这些方法组合了前面的所有方法。
  • 如果未指定-Approach,则进入 interactive 模式,在该模式下(重复)提示您选择所需的方法。
  • 除了在交互模式下,输出具有比较计时的自定义对象。
function Measure-Parallel {

  [CmdletBinding()]
  param(
    [ValidateRange(2, 2147483647)] [int] $BatchSize = 5,
    [ValidateSet('Job', 'ThreadJob', 'Process', 'ForEachParallel', 'All')] [string[]] $Approach,
    [ValidateRange(2, 2147483647)] [int] $JobCount = $BatchSize # pass a higher count to run multiple batches
  )

  $noForEachParallel = $PSVersionTable.PSVersion.Major -lt 7
  $noStartThreadJob = -not (Get-Command -ErrorAction Ignore Start-ThreadJob)

  $interactive = -not $Approach
  if (-not $interactive) {
    # Translate the approach arguments into their corresponding hashtable keys (see below).
    if ('All' -eq $Approach) { $Approach = 'Job', 'ThreadJob', 'Process', 'ForEachParallel' }
    $approaches = $Approach.ForEach({
      if ($_ -eq 'ForEachParallel') { 'ForEach-Object -Parallel' }
      else { $_ -replace '^', 'Start-' }
    })
  }

  if ($noStartThreadJob) {
    if ($interactive -or $approaches -contains 'Start-ThreadJob') {
      Write-Warning "Start-ThreadJob is not installed, omitting its test; install it with ``Install-Module ThreadJob``"
      $approaches = $approaches.Where({ $_ -ne 'Start-ThreadJob' })
    }
  }
  if ($noForEachParallel) {
    if ($interactive -or $approaches -contains 'ForEach-Object -Parallel') {
      Write-Warning "ForEach-Object -Parallel is not available in this PowerShell version (requires v7+), omitting its test."
      $approaches = $approaches.Where({ $_ -ne 'ForEach-Object -Parallel' })
    }
  }

  # Simulated input: Create 'f0.zip', 'f1'.zip', ... file names.
  $zipFiles = 0..($JobCount - 1) -replace '^', 'f' -replace '$', '.zip'

  # Sample executables to run - here, the native shell is called to simply 
  # echo the argument given.
  # The external program to invoke.
  $exe = if ($env:OS -eq 'Windows_NT') { 'cmd.exe' } else { 'sh' }
  # The list of its arguments *as a single string* - use '{0}' as the placeholder for where the input object should go.
  $exeArgList = if ($env:OS -eq 'Windows_NT') { '/c "echo {0}"' } else { '-c "echo {0}"' }

  # A hashtable with script blocks that implement the 3 approaches to parallelism.
  $approachImpl = [ordered] @{}

  $approachImpl['Start-Job'] = { # child-process-based job
    param([array] $batch)
    $batch | 
    ForEach-Object {
      Start-Job { Invoke-Expression ($using:exe + ' ' + ($using:exeArgList -f $args[0])) } -ArgumentList $_
    } |
    Receive-Job -Wait -AutoRemoveJob # wait for all jobs, relay their output, then remove them.
  }

  if (-not $noStartThreadJob) {
    # If Start-ThreadJob is available, add an approach for it.
    $approachImpl['Start-ThreadJob'] = { # thread-based job - requires Install-Module ThreadJob in WinPS
      param([array] $batch)
      $batch |
      ForEach-Object {
        Start-ThreadJob -ThrottleLimit $BatchSize { Invoke-Expression ($using:exe + ' ' + ($using:exeArgList -f $args[0])) } -ArgumentList $_
      } |
      Receive-Job -Wait -AutoRemoveJob
    }
  }

  if (-not $noForEachParallel) {
    # If ForEach-Object -Parallel is supported (v7+), add an approach for it.
    $approachImpl['ForEach-Object -Parallel'] = {  
      param([array] $batch)
      $batch | ForEach-Object -ThrottleLimit $BatchSize -Parallel {
        Invoke-Expression ($using:exe + ' ' + ($using:exeArgList -f $_)) 
      }
    }
  }

  $approachImpl['Start-Process'] = { # direct execution of an external program
    param([array] $batch)
    $batch |
    ForEach-Object {
      Start-Process -NoNewWindow -PassThru $exe -ArgumentList ($exeArgList -f $_)
    } |
    Wait-Process # wait for all processes to terminate.
  }

  # Partition the array of all indices into subarrays (batches)
  $batches = @(
    0..([math]::Ceiling($zipFiles.Count / $batchSize) - 1) | ForEach-Object {
      , $zipFiles[($_ * $batchSize)..($_ * $batchSize + $batchSize - 1)]
    }
  )

  # In interactive use, print verbose messages by default
  if ($interactive) { $VerbosePreference = 'Continue' }

  :menu while ($true) {
    if ($interactive) {
      # Prompt for the approach to use.
      $choices = $approachImpl.Keys.ForEach({
        if ($_ -eq 'ForEach-Object -Parallel') { '&' + $_ }
        else { $_ -replace '-', '-&' }
      }) + '&Quit'
      $choice = $host.ui.PromptForChoice("Approach", "Select parallelism approach:", $choices, 0)
      if ($choice -eq $approachImpl.Count) { break }
      $approachKey = @($approachImpl.Keys)[$choice]
    }
    else {
      # Use the given approach(es)
      $approachKey = $approaches
    }
    $tsTotals = foreach ($appr in $approachKey) {
      $i = 0; $tsTotal = [timespan] 0
      $batches | ForEach-Object {
        $ts = Measure-Command { & $approachImpl[$appr] $_ | Out-Host }
        Write-Verbose "$batchSize-element '$appr' batch finished in $($ts.TotalSeconds.ToString('N2')) secs."
        $tsTotal += $ts
        if (++$i -eq $batches.Count) {
          # last batch processed.
          if ($batches.Count -gt 1) {
            Write-Verbose "'$appr' processing of $JobCount items overall finished in $($tsTotal.TotalSeconds.ToString('N2')) secs." 
          }
          $tsTotal # output the overall timing for this approach
        }
        elseif ($interactive) {
          $choice = $host.ui.PromptForChoice("Continue?", "Select action", ('&Next batch', '&Return to Menu', '&Quit'), 0)
          if ($choice -eq 1) { continue menu }
          if ($choice -eq 2) { break menu }
        }
      }
    }
    if (-not $interactive) {
      # Output a result object with the overall timings.
      $oht = [ordered] @{}; $i = 0
      $oht['JobCount'] = $JobCount
      $oht['BatchSize'] = $BatchSize
      $oht['BatchCount'] = $batches.Count
      foreach ($appr in $approachKey) {        
        $oht[($appr + ' (secs.)')] = $tsTotals[$i++].TotalSeconds.ToString('N2')
      }
      [pscustomobject] $oht
      break # break out of the infinite :menu loop
    }
  }

}
7rtdyuoh

7rtdyuoh3#

您可以在foreach循环中添加一个计数器,如果计数器达到所需的值,则中断循环

$numjobs = 5
$counter = 0
foreach ($i in $zipfiles) {
  $counter++
  if ($counter -ge $numjobs) {
    break 
  }
  <your code>
}

或者使用Powershells Foreach-Object

$numjobs = 5
$zipfiles | select -first $numjobs | Foreach-Object {
  <your code>
}

如果你想批量处理整个数组,并等待每个批处理完成,你必须保存Start-Job返回的对象,并将其传递给Wait-Job,如下所示:

$items = 1..100

$batchsize = 5

while ($true) {
    $jobs = @()
    $counter = 0
    foreach ($i in $items) {
        if ($counter -ge $batchsize) {
            $items = $items[$batchsize..($items.Length)]
            break 
        }
        $jobs += Start-Job -ScriptBlock { Start-Sleep 10 }
        $counter++
    }
    foreach ($job in $jobs) {
        $job | Wait-Job | Out-Null
    }
    if (!$items) {
        break
    }
}

根据设计,数组具有固定的长度,这就是为什么我用$items = $items[$batchsize..($items.Length)]重写整个数组

相关问题