sync/workerpool
这个包将提供一个具有简单任务接口的标准并发工作池实现。
package workerpool
type Task interface {
Invoke(ctx context.Context) error
}
type WorkerPool struct {}
func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context)
func (p WorkerPool) Wait()
示例
package main
import (
"context"
"golang.org/x/sync/workerpool"
)
func main() {
wp := workerpool.New(2)
ctx, cancel := context.WithCancel(context.Background())
go wp.Run(ctx) // runs until context is cancelled
// wp.Push(Task 1)
// wp.Push(Task 2)
wp.Wait() // blocks until all pending tasks are complete, but does not stop workerpool goroutine
cancel() // stops the workerpool
// wait for the workerpool to be stopped
select {
case <-ctx.Done():
}
}
原因
虽然互联网上有很多过于简单的例子,但当尝试编写更健壮的自定义实现时,问题空间会很快变得困难。我相信社区将从这样一个健壮的实现中受益匪浅,它可以在标准库中广泛使用。
我已经编写了 github.com/tniswong/workerpool 作为草案设计,我将其作为候选实现提供。这个设计使用 golang.org/x/sync/semaphore
来限制并发工作者。
设计说明
- 用于
WorkerPool.Run(context.Context)
和Task.Invoke(context.Context)
的context.Context
- 任务队列没有代码定义的大小限制
- 任务在完成时负责收集自己的结果(如果适用)
Push()
是并发安全的Push()
可以提供选项来指定任务调用行为,如重试(如果返回错误则重新启动任务)和重试最大值(除非返回错误超过 n 次,否则不重新启动任务)- 当 Run(ctxt) 被取消时,不会调用排队的任务。当 Run(ctxt) 被取消时,剩余的任务将以取消的上下文调用,以清除工作队列。
- 由于信号量,没有悬挂的工作线程需要清理
Run()
直到上下文被取消才运行
5条答案
按热度按时间xv8emn3q1#
看起来 https://pkg.go.dev/golang.org/x/sync/errgroup 具备了这里描述的大部分功能。你是否考虑过改进 https://pkg.go.dev/golang.org/x/sync/errgroup 而不是添加一个新的?
mitkmikd2#
老实说,我不知道那个包存在。我搜索的与工作池模式相关的信息从未出现过
sync/errgroup
作为选项。虽然我承认一开始看起来确实有很大的功能重叠,但我不太相信这些不是两个不同的用例。
根据我对工作池的理解(可能是天真的),一个池子只是简单地(并持续地)将任务分派给一组并发工作者,而不管任务结果如何,并且在池被停止之前一直可用来运行未来的任务。我相信
sync/errgroup
(而且我只简短地研究过这个包,所以很可能我是错的),另一方面,会在遇到任何任务错误时终止,并且只有在没有任务出错的情况下才能继续使用。假设我的观察是正确的,我可以看到
sync/errgroup
方法和这个提案提供的明显价值。xdnvmnnf3#
Run
方法使得这个API容易出现goroutine泄漏和同步错误:很容易意外泄漏Run
goroutine,尤其是它没有提供一种机制让该goroutine完成。(更多细节,请参见 my GopherCon '18 talk ,特别是从第75张幻灯片开始。)这就剩下了
New
、Push
和Wait
,它们分别类似于errgroup.Group
的SetLimit
( #27837 )、Go
和Wait
,但您是正确的,errgroup
专门关注错误聚合(以促进分叉/合并式并发),而这里提议的API故意不这样做(以促进重用)。这使得它更类似于
cmd/go/internal/par.Queue
,它只提供了NewQueue(maxActive int)
、Add(func())
和Idle() <-chan struct{}
。我认为这是这里更合适的API——它具有相同的并发限制属性,但没有易泄漏的Run
方法,并且在完成后有一种稍微更灵活的方式来select
。kiayqfof4#
Run
方法使得这个API容易出现goroutine泄漏和同步错误:很容易意外泄漏Run
goroutine,尤其是考虑到它没有一个机制来让这个goroutine完成。(更多细节,请参见 my GopherCon '18 talk ,特别是从第75张幻灯片开始。)实际上,这个问题已经出现在我的脑海里了,但我决定暂时不做处理,等待一些反馈。一个想法是添加一个
func (p WorkerPool) WaitStop()
来等待池本身优雅地停止。此外,这个设计不需要将Run(context.Context)
作为goroutine运行,它完全可以正常调用并阻塞,直到上下文被取消。任务也可以通过在调用Run(context.Context)
之前或从另一个goroutine中添加Push()
来添加,但我不确定这是否缓解了您的担忧。在了解到
sync/errgroup
的存在后,我一直在阅读它的历史,并偶然发现了您的演讲。今晚会听一听。在此期间,非常有兴趣听到其他关于如何应对这个问题的建议。剩下的
New
、Push
和Wait
与errgroup.Group
的SetLimit
( #27837 ) 、Go
和Wait
分别相似,但您说得对,errgroup
专门关注错误聚合(以促进分叉/合并式的并发),而这里提议的API故意不这样做(以促进重用)。这使得它更像
cmd/go/internal/par.Queue
,它只提供NewQueue(maxActive int)
、Add(func())
和Idle() <-chan struct{}
。我认为这是这里更合适的API——它具有相同的并发限制属性,但没有易泄漏的Run
方法,并且在完成时有一种稍微更灵活的方式来select
。我需要研究一下
par.Queue
才能做出恰当的评论,但我的理解是它是一个内部包,因此无法使用。您是在建议使这个API通常可用还是将其用作修改此提案定义的API的参考?乍一看,par.Queue的人体工程学似乎比我想要的更“聪明”。WaitGroup风格的
Wait()
感觉非常清晰自然。感谢您的反馈!
k4emjkb15#
我终于有了一些时间来回顾这个,并采纳了来自@bcmills的反馈(顺便说一下,谢谢你):
同步/工作线程池
示例
我还更新了我参考实现:
https://pkg.go.dev/github.com/tniswong/workerpool/v2@v2.0.0
https://github.com/tniswong/workerpool/tree/master/v2