csv Goroutine之间的同步问题,一个阻塞另一个

vbkedwbf  于 2022-12-06  发布在  Go
关注(0)|答案(2)|浏览(134)

我试图处理一个CSV文件,我从AWS S3阅读,对于每一行文本,我想激活worker函数做一些工作,并返回一个结果
理想情况下,我希望结果按原始CSV排序,但这不是必需的,因为某些原因,当我运行这段代码时,我会得到奇怪的数据竞争和这行代码:

for result := range output {
   results = append(results, result)
}

永久块
我尝试使用WaitGroup,但也不起作用,关闭output通道也会导致“尝试将某些内容放入关闭的通道”错误

func main() {
    resp, err := ReadCSV(bucket, key)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()
    reader := csv.NewReader(resp.Body)

    detector := NewDetector(languages)
    var results []DetectionResult

    numWorkers := 4
    input := make(chan string, numWorkers)
    output := make(chan DetectionResult, numWorkers)

    start := time.Now()

    for w := 1; w < numWorkers+1; w++ {
        go worker(w, detector, input, output)
    }

    go func() {
        for {
            record, err := reader.Read()
            if err == io.EOF {
                close(input)
                break
            }

            if err != nil {
                log.Fatal(err)
            }

            text := record[0]
            input <- text
        }
    }()

    for result := range output {
        results = append(results, result)
    }

    elapsed := time.Since(start)

    log.Printf("Decoded %d lines of text in %s", len(results), elapsed)
}

func worker(id int, detector lingua.LanguageDetector, input chan string, output chan DetectionResult) {
    log.Printf("worker %d started\n", id)
    for t := range input {
        result := DetectText(detector, t)
        output <- result
    }
    log.Printf("worker %d finished\n", id)
}

尝试处理CSV(理想情况下按顺序),并使用worker函数调用的结果来丰富它
尝试设置WaitGroup,尝试在完成阅读(EOF)时关闭输出通道-导致错误

abithluo

abithluo1#

for循环会一直读到output通道关闭,你必须在处理完所有输入后关闭output通道(而不是在阅读输入后)。
您可以使用等待组来执行以下操作:

func worker(detector lingua.LanguageDetector, wg *sync.WaitGroup) func(id int, input chan string, output chan DetectionResult) {
   wg.Add(1)
   return func(id int, input chan string, output chan DetectionResult) {
      defer wg.Done() // Notify wg when processing is finished
      log.Printf("worker %d started\n", id)
      for t := range input {
         result := DetectText(detector, t)
         output <- result
      }
      log.Printf("worker %d finished\n", id)
   }
}

然后道:

go func() {
    wg.Wait()
    close(output)
}()
for result := range output {
        results = append(results, result)
}
xoefb8l8

xoefb8l82#

我发现你缺少了一种方法来通知工人没有工作了,他们应该停止工作。你还需要一种方法来让工人发出他们确实完成了的信号。当所有这些信号都被发送和接收后,main应该控制所有工人的累积结果。
我们可以在所有CSV记录迭代完毕,并且所有作业都通过input发送后,通过关闭input向工作者发出信号:

nWorkers := 4

input := make(chan Tx, nWorkers*2) // buffer so input (the "jobs queue") is always full; see rationale at bottom of answer
output := make(chan Ty)
done := make(chan bool)

for i := 1; i < nWorkers+1; i++ {
    go worker(input, output, done)
}

go func() {
    for {
        record, _ := reader.Read()
        input <- record[0]
    }
    close(input)
}()

发送输入任务的goroutine可以在没有任务的时候安全地关闭输入任务,工作线程仍然可以接收到输入任务,即使它关闭了。
当输入被关闭并最终为空时,工作线程的range循环退出,然后工作线程通过在done通道上发送信号返回:

func worker(input <-chan Tx, output chan<- Ty, done <-chan bool) {
    for x := range input { // loop until input is closed
        output <- doWork(x)
    }
    done <- true // finally send done
}

当我们收到nWorker-number of done消息时,我们就知道所有工作都已完成,工作线程不会在输出时发送,因此可以安全地关闭输出:

go func() {
    log.Println("counting done workers")
    var doneCtr int
    for {
        select {
        case <-done:
            log.Println("got done")
            doneCtr++
        }

        if doneCtr == nWorkers {
            close(output) // signal the results appender to stop
            log.Println("closed output")
        }
    }
}()

关闭输出是向main发出的信号,表明它可以停止尝试接收和累积结果:

results := make([]result, 0)
for result := range output {
    results = append(results, result)
}

最后:所有其他的goroutine都已经终止了,main可以继续处理累积的结果。
至于按原始顺序获取结果,只需将原始顺序与每个作业一起发送,然后将该顺序与结果一起发回,然后按顺序排序即可:

type row struct {
    num  int
    text string
}

type result struct {
    lang language
    row  row
}

...

input <- row{rowNum, record[0]}
rowNum++

...

output <- result{detect(row.text), row}

...

results = append(results, result)

...

sort.Slice(results, func(i, j int) bool { return results[i].row.num < results[j].row.num})

我在The Go Playground中制作了一个完整的工作模型。
我的推理可能在缓冲上是错误的,但是,在我看来,唯一真实的令人失望的是发现工作者在等待输入时停顿了。缓冲输入的工作者数量是工作者数量的2倍,这确保了每个工作者在任何时刻平均有两个作业在等待。

相关问题