go 建议:同步:添加包 sync/workerpool

wnavrhmk  于 5个月前  发布在  Go
关注(0)|答案(5)|浏览(44)

sync/workerpool

这个包将提供一个具有简单任务接口的标准并发工作池实现。

package workerpool

type Task interface {
	Invoke(ctx context.Context) error
}

type WorkerPool struct {}

func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context)
func (p WorkerPool) Wait()

示例

package main

import (
    "context"
    "golang.org/x/sync/workerpool"
)

func main() {

    wp := workerpool.New(2)
    ctx, cancel := context.WithCancel(context.Background())
    
    go wp.Run(ctx) // runs until context is cancelled

    // wp.Push(Task 1)
    // wp.Push(Task 2)

    wp.Wait() // blocks until all pending tasks are complete, but does not stop workerpool goroutine
    cancel() // stops the workerpool

    // wait for the workerpool to be stopped
    select {
    case <-ctx.Done():
    }

}

原因

虽然互联网上有很多过于简单的例子,但当尝试编写更健壮的自定义实现时,问题空间会很快变得困难。我相信社区将从这样一个健壮的实现中受益匪浅,它可以在标准库中广泛使用。
我已经编写了 github.com/tniswong/workerpool 作为草案设计,我将其作为候选实现提供。这个设计使用 golang.org/x/sync/semaphore 来限制并发工作者。

设计说明

  • 用于 WorkerPool.Run(context.Context)Task.Invoke(context.Context)context.Context
  • 任务队列没有代码定义的大小限制
  • 任务在完成时负责收集自己的结果(如果适用)
  • Push() 是并发安全的
  • Push() 可以提供选项来指定任务调用行为,如重试(如果返回错误则重新启动任务)和重试最大值(除非返回错误超过 n 次,否则不重新启动任务)
  • 当 Run(ctxt) 被取消时,不会调用排队的任务。当 Run(ctxt) 被取消时,剩余的任务将以取消的上下文调用,以清除工作队列。
  • 由于信号量,没有悬挂的工作线程需要清理
  • Run() 直到上下文被取消才运行
xv8emn3q

xv8emn3q1#

看起来 https://pkg.go.dev/golang.org/x/sync/errgroup 具备了这里描述的大部分功能。你是否考虑过改进 https://pkg.go.dev/golang.org/x/sync/errgroup 而不是添加一个新的?

mitkmikd

mitkmikd2#

老实说,我不知道那个包存在。我搜索的与工作池模式相关的信息从未出现过 sync/errgroup 作为选项。
虽然我承认一开始看起来确实有很大的功能重叠,但我不太相信这些不是两个不同的用例。
根据我对工作池的理解(可能是天真的),一个池子只是简单地(并持续地)将任务分派给一组并发工作者,而不管任务结果如何,并且在池被停止之前一直可用来运行未来的任务。我相信 sync/errgroup (而且我只简短地研究过这个包,所以很可能我是错的),另一方面,会在遇到任何任务错误时终止,并且只有在没有任务出错的情况下才能继续使用。
假设我的观察是正确的,我可以看到 sync/errgroup 方法和这个提案提供的明显价值。

xdnvmnnf

xdnvmnnf3#

Run 方法使得这个API容易出现goroutine泄漏和同步错误:很容易意外泄漏 Run goroutine,尤其是它没有提供一种机制让该goroutine完成。(更多细节,请参见 my GopherCon '18 talk ,特别是从第75张幻灯片开始。)
这就剩下了 NewPushWait ,它们分别类似于 errgroup.GroupSetLimit ( #27837 )、 GoWait ,但您是正确的, errgroup 专门关注错误聚合(以促进分叉/合并式并发),而这里提议的API故意不这样做(以促进重用)。
这使得它更类似于 cmd/go/internal/par.Queue,它只提供了 NewQueue(maxActive int)Add(func())Idle() <-chan struct{} 。我认为这是这里更合适的API——它具有相同的并发限制属性,但没有易泄漏的 Run 方法,并且在完成后有一种稍微更灵活的方式来 select

kiayqfof

kiayqfof4#

Run 方法使得这个API容易出现goroutine泄漏和同步错误:很容易意外泄漏 Run goroutine,尤其是考虑到它没有一个机制来让这个goroutine完成。(更多细节,请参见 my GopherCon '18 talk ,特别是从第75张幻灯片开始。)

实际上,这个问题已经出现在我的脑海里了,但我决定暂时不做处理,等待一些反馈。一个想法是添加一个 func (p WorkerPool) WaitStop() 来等待池本身优雅地停止。此外,这个设计不需要将 Run(context.Context) 作为goroutine运行,它完全可以正常调用并阻塞,直到上下文被取消。任务也可以通过在调用 Run(context.Context) 之前或从另一个goroutine中添加 Push() 来添加,但我不确定这是否缓解了您的担忧。

在了解到 sync/errgroup 的存在后,我一直在阅读它的历史,并偶然发现了您的演讲。今晚会听一听。在此期间,非常有兴趣听到其他关于如何应对这个问题的建议。

剩下的 NewPushWaiterrgroup.GroupSetLimit ( #27837 ) 、 GoWait 分别相似,但您说得对, errgroup 专门关注错误聚合(以促进分叉/合并式的并发),而这里提议的API故意不这样做(以促进重用)。

这使得它更像 cmd/go/internal/par.Queue,它只提供 NewQueue(maxActive int)Add(func())Idle() <-chan struct{} 。我认为这是这里更合适的API——它具有相同的并发限制属性,但没有易泄漏的 Run 方法,并且在完成时有一种稍微更灵活的方式来 select

我需要研究一下 par.Queue 才能做出恰当的评论,但我的理解是它是一个内部包,因此无法使用。您是在建议使这个API通常可用还是将其用作修改此提案定义的API的参考?

乍一看,par.Queue的人体工程学似乎比我想要的更“聪明”。WaitGroup风格的 Wait() 感觉非常清晰自然。

感谢您的反馈!

k4emjkb1

k4emjkb15#

我终于有了一些时间来回顾这个,并采纳了来自@bcmills的反馈(顺便说一下,谢谢你):

同步/工作线程池

package workerpool

type Task interface {
	Invoke(ctx context.Context) error
}

type WorkerPool struct {}

func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context) <-chan struct{}

示例

package main

import (
    "context"
    "golang.org/x/sync/workerpool"
    "time"
)

func main() {

    pool := workerpool.New(2)
    ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
    defer cancel()
    
    done := wp.Run(ctx) // runs until context is cancelled

    // pool.Push(Task 1)
    // pool.Push(Task 2)

    // block until the workerpool is stopped and done channel is closed
    <-done

}

我还更新了我参考实现:
https://pkg.go.dev/github.com/tniswong/workerpool/v2@v2.0.0
https://github.com/tniswong/workerpool/tree/master/v2

相关问题