Go语言 阅读两个通道,项目数未知

enyaitl3  于 2023-04-03  发布在  Go
关注(0)|答案(2)|浏览(102)

尝试生成数字并将其发送到另一个函数将读取的通道中,然后根据是否有错误执行一些操作并返回两个通道。我不知道会有多少错误,因此out和errorCh有一个defer close来表示没有更多的项目。
在本例中,我有一个通道,我将数字发送到该通道,然后将该通道传递给一个函数,该函数返回两个通道,out和errorCh。最后,我从out和errorCh中读取值。
我延迟关闭in通道,这样当下游的run函数没有更多的值发送时,它就知道没有更多的数字要处理了。这意味着run函数将在从in获得关闭通道信号后完成。这也将关闭out和errorCh,这样select语句将获得这两个通道关闭的信号。我是否没有正确关闭其中一个通道?为什么会出现僵局?
https://go.dev/play/p/hKSkfk0VQ2d

package main

import (
    "errors"
    "fmt"
)

func run(in <-chan int) (chan int, chan error) {
    out := make(chan int)
    errorCh := make(chan error)

    go func() {
        defer close(out)
        defer close(errorCh)

        for i := range in {
            if i%2 == 0 {
                out <- i
            } else {
                errorCh <- errors.New("we don't like odd numbers")
            }
        }
    }()
    return out, errorCh
}

func main() {
    in := make(chan int)
    out := make(chan int)
    errors := make(chan error)

    // generate numbers to send to the "in" channel
    go func(in chan int) {
        defer close(in)
        for i := 0; i < 10; i++ {
            fmt.Println("  input", i)
            in <- i
        }
    }(in)

    // run function reads from the "in" channel, does something and returns
    // the "out" channel and "errorCh"
    go func(in chan int) {
        out, errors = run(in)
    }(in)

    // read from the "out" channel and "errorCh" until all of the numbers
    // in the "in" channel have been run
    for {
        select {
        case i, ok := <-out:
            if !ok { // out channel is closed
                return // Done
            }
            fmt.Println("done", i)
        case err, ok := <-errors:
            if !ok {
                return
            }
            if err != nil {
                fmt.Println(err)
            }
        }
    }
}
0vvn1miw

0vvn1miw1#

死锁是非常有趣的,有多种原因可能导致死锁。
1.如果从尚未准备好放入数据或尚未写入数据的通道阅读数据。
1.如果您正在写入通道,但在写入时没有人在那里读取该数据。
在你的例子中,你在main函数中提到的out通道和你在run函数中提到的out通道是完全不同的。虽然你试图从out, errors = run(in)复制值,但不用说,这只有当out <- irun函数中执行时才能成功。
现在,在case i, ok := <-out:main函数中,你试图从out通道中读取值。现在让我再次提醒你,你没有在两个go例程中使用相同的out通道,所以Go运行时只是试图从<-out读取值,甚至在out <- i发生之前,正如@zerkms所提到的那样。
所以这福尔斯在写入之前从通道阅读的范畴。另一方面,写入部分也在等待一些goroutine读取。这意味着读取部分等待写入,写入部分等待读取,**典型的死锁条件。
所以准确地说,你的代码可能没有竞争,但它有死锁
我假设你正在尝试创建一个生产者消费者模式。

package main

import (
    "errors"
    "fmt"
)

func Consume(in, out chan int, err chan error) {
    defer close(out)
    defer close(err)
    for i := range in {
        if i%2 == 0 {
            out <- i
        } else {
            err <- errors.New("we don't like odd numbers")
        }
    }
}

func main() {
    in := make(chan int)
    out := make(chan int)
    err := make(chan error)
    go func(in chan int) {
        defer close(in)
        for i := 0; i < 10; i++ {
            fmt.Println("  input", i)
            in <- i
        }
    }(in)

    go Consume(in, out, err)

    for {
        select {
        case i, ok := <-out:
            if !ok {
                return
            }
            fmt.Printf("Processed Data %d\n", i)
        case er, ok := <-err:
            if !ok {
                return
            }
            if er != nil {
                fmt.Println(er)
            }
        }
    }
}

请注意我是如何在main中定义通道并传递这些通道,而不是重新定义通道

hmmo2u0o

hmmo2u0o2#

这里有些事你得解决
这导致了@zerkms的竞争条件:

go func(in chan int) {
        out, errors = run(in)
    }(in)

out, errors必须被设置为run(in) * 的结果,然后才能在下面的通道上启动select,所以你不想在goroutine中运行它。你在main的开头make d它们,然后在run中重新make d它们。

out, errors = run(in)

这使得main对通道outerrorsmake操作是浪费的,因为run将创建新的通道。
一个二个一个一个
像这样把通道传递给函数是一个很好的方法来使作用域显式。对于复杂的程序,我避免定义非triial的goroutine函数内联,以免它不共享作用域。
除此之外,your code will run,但它缺少并行处理的一个关键组件:你没有并行地做任何事情,你只有一个goroutine的工人,就是run创建的工人。
作为一个工作负载,确定一个整数的parity是一个很好的概念性替代,可以轻松地并行化工作。但是你一次处理一个项目的工作-实际上,通道在一次可以处理 * 多个 * 项目的工作的情况下很有用。
如果你有多个worker,你不能关闭每个worker中的outerror通道。没有worker知道其他worker是否完成了处理。事实上,你的goroutine目前都不知道 * 所有 * worker是否完成了处理。你知道in是关闭的,但如果有多个worker,这并不意味着X1 M15 N1 X的所有消息都完成了处理。
这就是sync.WaitGroup的用武之地。使用sync.WaitGroup,您可以计算已生成的工作线程的数量,然后等待它们全部完成。这允许您在所有工作线程完成后关闭outerrors
首先,我们将重写worker,使其能够被多次调用。我们的worker本质上是相同的,但不是关闭outerrs,而是用wg.Done()表示我们完成了。

func worker(wg *sync.WaitGroup, in <-chan int, out chan<- int, errs chan<- error) {
    defer wg.Done()
    for i := range in {
        if i%2 == 0 {
            out <- i
        } else {
            errs <- fmt.Errorf("we don't like odd numbers (%d)", i)
        }
    }
}

现在我们需要创造一些工人

var in = make(chan int)
    var out = make(chan int)
    var errs = make(chan error)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go worker(&wg, in, out, errs)
    }

一个goroutine在关闭outerrs之前等待等待组

go func() {
        wg.Wait()
        close(errs)
        close(out)
    }()

我们还需要为两个通道调整此行为:

case i, ok := <-out:
            if !ok { // out channel is closed
                return // Done
            }

第一,更正:

if !ok { // out channel is closed

!ok表示通道已关闭 * 且为空 *。
对于一个worker,关闭outerrors是程序完成的等效信号,并且您知道通道上不会再有发送,因为一个worker正在退出。但是对于许多worker,我们不能确定所有通道的所有消息都已被发送。如果outerrors中的一个为空并关闭,那么我们可以通过关闭outerrors来确定所有通道的消息都已被发送。在接收到另一信道的消息之前,程序结束。
相反,使用Go的一个简洁的特性:select不会从nil通道读取。因此,当通道关闭并变为空时,我们可以将它们设置为nil。当它们都是nil时,我们就完成了。

for errs != nil || out != nil {
        select {
        case i, ok := <-out:
            if !ok { // out channel is closed
                out = nil
            }
            fmt.Println("done", i)
        case err, ok := <-errs:
            if !ok {
                errs = nil
            }
            if err != nil {
                fmt.Println(err)
            }
        }
    }

Here's the whole thing .
像这样一起使用等待组和通道是一种常见的做法。通常,我发现有一个由输入项,reuslt和错误组成的返回类型比有两个返回通道更简单,更有用,一个用于结果,一个用于错误。

type workResult struct {
   err error
   parity bool
   input int
}

通常在并行化工作时,能够将输入与错误关联起来是很有用的。这样你就不必做任何select操作,收集结果看起来更像是一个简单的:

for res := range results {
   if res.err != nil {  
      // handle error ... 
   } else {
      // do whatever with results
   }
}

处理多个错误时,可以考虑使用https://pkg.go.dev/github.com/hashicorp/go-multierror,这是一种有用的方法,可以收集错误并将其合并为一个最终值,同时仍然允许成功的工作继续。

相关问题