我在理解如何正确地阻塞和关闭通道时遇到了一个问题。我正在启动一些任意数量的worker,我发现我的main函数在worker完成之前就退出了。我需要一个更好的方法来阻塞,而worker在没有main退出的情况下读取通道,然后优雅地关闭通道以结束循环。
我尝试了一些方法,包括使用等待组,但问题仍然存在。我注意到,通过添加time.Sleep
,程序可以正常工作,但是注解掉它的结果是没有完成任何工作。
time.Sleep(time.Duration(10 * time.Second))
更新
需要说明的是,worker()
是问题的根源,如果我把函数体移到main
中,程序就可以工作,但我不希望这样。
for i := range ch {
fmt.Println("channel val:", i)
}
我的意图是让n个独立的工作者开始,他们可以从并行的通道中平等地读取数据。
按照建议,在主体中将time.Sleep()
替换为wg.Wait()
,结果是fatal error: all goroutines are asleep - deadlock!
https://go.dev/play/p/Wa0DLMmnd8z
这里有一个可运行的示例https://go.dev/play/p/QHqNj-AJQBI,其中保留了Sleep
。这里是注解掉睡眠超时的损坏代码。
package main
import (
"fmt"
"sync"
"time"
)
// some complicated work
func do(num int, ch chan<- int) {
time.Sleep(time.Duration(500 * time.Millisecond))
ch <- num
}
func main() {
results := make(chan int)
// for some number of required complicated work
for i := 0; i < 53; i++ {
go do(i, results)
}
var wg sync.WaitGroup
// start 3 workers which can process results
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, results)
}(i)
}
// handle closing the channel when all workers complete
go func() {
wg.Wait()
close(results)
}()
//time.Sleep(time.Duration(10 * time.Second))
fmt.Println("donezo")
}
// process the results of do() in a meaningful way
func worker(id int, ch <-chan int) {
fmt.Println("starting worker", id)
for i := range ch {
fmt.Println("channel val:", i)
}
}
我也试过把defer wg.Done()
移到worker()
函数里面,但还是同样的问题,没有睡眠就不能工作。
// process the results of do() in a meaningful way
func worker(wg *sync.WaitGroup, id int, ch <-chan int) {
fmt.Println("starting worker", id)
defer wg.Done()
for i := range ch {
fmt.Println("channel val:", i)
}
}
是我选择了错误的模式,还是我只是使用了错误的模式?
1条答案
按热度按时间yeotifhr1#
使用此代码: