我试图处理一个CSV文件,我从AWS S3阅读,对于每一行文本,我想激活worker
函数做一些工作,并返回一个结果
理想情况下,我希望结果按原始CSV排序,但这不是必需的,因为某些原因,当我运行这段代码时,我会得到奇怪的数据竞争和这行代码:
for result := range output {
results = append(results, result)
}
永久块
我尝试使用WaitGroup,但也不起作用,关闭output
通道也会导致“尝试将某些内容放入关闭的通道”错误
func main() {
resp, err := ReadCSV(bucket, key)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
detector := NewDetector(languages)
var results []DetectionResult
numWorkers := 4
input := make(chan string, numWorkers)
output := make(chan DetectionResult, numWorkers)
start := time.Now()
for w := 1; w < numWorkers+1; w++ {
go worker(w, detector, input, output)
}
go func() {
for {
record, err := reader.Read()
if err == io.EOF {
close(input)
break
}
if err != nil {
log.Fatal(err)
}
text := record[0]
input <- text
}
}()
for result := range output {
results = append(results, result)
}
elapsed := time.Since(start)
log.Printf("Decoded %d lines of text in %s", len(results), elapsed)
}
func worker(id int, detector lingua.LanguageDetector, input chan string, output chan DetectionResult) {
log.Printf("worker %d started\n", id)
for t := range input {
result := DetectText(detector, t)
output <- result
}
log.Printf("worker %d finished\n", id)
}
尝试处理CSV(理想情况下按顺序),并使用worker
函数调用的结果来丰富它
尝试设置WaitGroup,尝试在完成阅读(EOF)时关闭输出通道-导致错误
2条答案
按热度按时间abithluo1#
for循环会一直读到
output
通道关闭,你必须在处理完所有输入后关闭output
通道(而不是在阅读输入后)。您可以使用等待组来执行以下操作:
然后道:
xoefb8l82#
我发现你缺少了一种方法来通知工人没有工作了,他们应该停止工作。你还需要一种方法来让工人发出他们确实完成了的信号。当所有这些信号都被发送和接收后,main应该控制所有工人的累积结果。
我们可以在所有CSV记录迭代完毕,并且所有作业都通过input发送后,通过关闭input向工作者发出信号:
发送输入任务的goroutine可以在没有任务的时候安全地关闭输入任务,工作线程仍然可以接收到输入任务,即使它关闭了。
当输入被关闭并最终为空时,工作线程的range循环退出,然后工作线程通过在done通道上发送信号返回:
当我们收到nWorker-number of done消息时,我们就知道所有工作都已完成,工作线程不会在输出时发送,因此可以安全地关闭输出:
关闭输出是向main发出的信号,表明它可以停止尝试接收和累积结果:
最后:所有其他的goroutine都已经终止了,main可以继续处理累积的结果。
至于按原始顺序获取结果,只需将原始顺序与每个作业一起发送,然后将该顺序与结果一起发回,然后按顺序排序即可:
我在The Go Playground中制作了一个完整的工作模型。
我的推理可能在缓冲上是错误的,但是,在我看来,唯一真实的令人失望的是发现工作者在等待输入时停顿了。缓冲输入的工作者数量是工作者数量的2倍,这确保了每个工作者在任何时刻平均有两个作业在等待。