如果一个goroutine在go中出现错误,关闭多个goroutine

6tqwzwtp  于 2023-11-14  发布在  Go
关注(0)|答案(4)|浏览(119)

考虑这个函数:

func doAllWork() error {

    var wg sync.WaitGroup

    for i := 0; i < 2; i++ {

        wg.add(1)
        go func() {

            defer wg.Done()
            for j := 0; j < 10; j++ {
                result, err := work(j)
                if err != nil {
                    // can't use `return err` here
                    // what sould I put instead ? 
                    os.Exit(0)
                }
            }
        }()
    }
    wg.Wait()

    return nil
}

字符串
在每个goroutine中,函数work()被调用10次。如果对work()的一次调用在任何一个运行的goroutine中返回错误,我希望所有的goroutine立即停止,程序退出。这里可以使用os.Exit()吗?我应该如何处理?

编辑:这个问题与how to stop a goroutine不同,因为这里我需要在一个goroutine出现错误时关闭所有goroutine。

xvw2m8pv

xvw2m8pv1#

您可以使用context包,它是为这样的事情创建的(“携带截止日期,取消信号...")。
你创建了一个能够用context.WithCancel()发布取消信号的上下文(父上下文可能是context.Background()返回的那个)。这将返回一个cancel()函数,它可以用来向工作goroutine取消(或者更准确地说,* 信号 * 取消意图)。
在worker goroutines中,你必须检查这样的意图是否已经被启动,通过检查Context.Done()返回的通道是否关闭,最简单的方法是尝试从它那里接收(如果它关闭了,它会立即继续)。要做一个非阻塞检查(如果它没有关闭,你可以继续),使用带有default分支的select语句。
我将使用下面的work()实现,它模拟10%的失败机会,并模拟1秒的工作:

func work(i int) (int, error) {
    if rand.Intn(100) < 10 { // 10% of failure
        return 0, errors.New("random error")
    }
    time.Sleep(time.Second)
    return 100 + i, nil
}

字符串
doAllWork()可能看起来像这样:

func doAllWork() error {
    var wg sync.WaitGroup

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // Make sure it's called to release resources even if no errors

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()

            for j := 0; j < 10; j++ {
                // Check if any error occurred in any other gorouties:
                select {
                case <-ctx.Done():
                    return // Error somewhere, terminate
                default: // Default is must to avoid blocking
                }
                result, err := work(j)
                if err != nil {
                    fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
                    cancel()
                    return
                }
                fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
            }
        }(i)
    }
    wg.Wait()

    return ctx.Err()
}


这是如何测试它:

func main() {
    rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
    fmt.Printf("doAllWork: %v\n", doAllWork())
}


输出(在Go Playground上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled


如果没有错误,例如使用以下work()函数时:

func work(i int) (int, error) {
    time.Sleep(time.Second)
    return 100 + i, nil
}


输出将像这样(在Go Playground上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>

备注:

基本上,我们只是使用了上下文的Done()通道,所以看起来我们可以很容易地(如果不是更容易的话)使用done通道而不是Context,关闭通道来做上面解决方案中cancel()所做的事情。
这是不正确的。**这只能在只有一个goroutine可以关闭通道的情况下使用,但在我们的情况下,任何工作线程都可以这样做。(详见此处:未初始化的通道如何运行?)。因此,您必须确保close(done)周围的某种同步/排除,实际上这正是cancel()函数在幕后所做的,隐藏/抽象远离你的眼睛,所以cancel()可能会被多次调用,以使你的代码/使用更简单。

如何获取并返回worker的错误?

为此,您可以使用错误通道:

errs := make(chan error, 2) // Buffer for 2 errors


在worker内部,当遇到错误时,在通道上发送它而不是打印它:

result, err := work(j)
if err != nil {
    errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
    cancel()
    return
}


在循环之后,如果有错误,返回它(否则返回nil):

// Return (first) error, if any:
if ctx.Err() != nil {
    return <-errs
}
return nil


这次的输出(在Go Playground上试试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error

请注意,我使用了一个缓冲通道,其缓冲区大小等于工作线程的数量,这确保了在其上发送总是无阻塞的。这也使您有可能接收和处理所有错误,而不仅仅是一个错误(例如第一个错误)。另一种选择是使用一个缓冲通道只保存1,并在其上执行无阻塞发送,可能如下所示:

errs := make(chan error, 1) // Buffered only for the first error

// ...and inside the worker:

result, err := work(j)
if err != nil {
    // Non-blocking send:
    select {
    case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
    default:
    }
    cancel()
    return
}
0mkxixxg

0mkxixxg2#

一种更清晰的方法是使用errgroupdocumentation)。
errgroup为处理公共任务的子任务的goroutine组提供了同步、错误传播和上下文取消。
您可以在此示例(playground)中查看它:

var g errgroup.Group
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }

    for _, url := range urls {
        // Launch a goroutine to fetch the URL.
        url := url // https://golang.org/doc/faq#closures_and_goroutines
        
       g.Go(func() error {
            // Fetch the URL.
            resp, err := http.Get(url)
            if err == nil {
                resp.Body.Close()
            }
            return err
        })
    }
   
    // Wait for all HTTP fetches to complete.
    if err := g.Wait(); err == nil {
        fmt.Println("Successfully fetched all URLs.")
    
    } else {

        // After all have run, at least one of them has returned an error!
       // But all have to finish their work!
       // If you want to stop others goroutines when one fail, go ahead reading!
        fmt.Println("Unsuccessfully fetched URLs.")
    }

字符串
但是请注意:Go文档中的The first call to return a non-nil error cancels the group短语有点误导。
实际上,如果用一个上下文(WithContext函数)创建了errgroup.Group,则当组中的goroutine返回错误时,会调用WithContext返回的上下文的cancel函数,否则什么也不会做(阅读此处的源代码!)。
所以,如果你想关闭你的不同goroutine,你必须使用我的WithContext返回的上下文,并在它们里面自己管理它,errgroup只会关闭这个上下文!Here you can find an example.
总而言之,errgroup可以以不同的方式使用,如示例所示。“
1.“just errors”,如上面的例子:Wait等待所有goroutine结束,然后返回第一个非nil错误(如果有),或者返回nil
1.并行:您必须使用WithContext函数创建组,并使用上下文来管理上下文关闭。I created a playground example here with some sleeps!您必须手动关闭每个goroutine,但使用上下文可以在关闭上下文时结束它们。
1.管道(请参阅示例中的更多内容)。

qaxu7uf2

qaxu7uf23#

从go1.20开始,context包现在有了WithCancelCause,它可以用于错误传播。请参阅https://pkg.go.dev/context#WithCancelCause

ctx, cancel := context.WithCancelCause(parent)
cancel(myError)
ctx.Err() // returns context.Canceled
context.Cause(ctx) // returns myError

字符串

zfycwa2u

zfycwa2u4#

另一种方法是使用errgroup.WithContext。你可以在这个example中检查它。
简单地说,g.Wait()等待第一个错误发生,或者所有的goroutine都没有错误地完成。当任何一个goroutine发生错误时(在提供的例子中超时),它会通过ctx.Done()通道取消其他goroutine的执行。

相关问题