一个使用golang并行计算的例子

x33g5p2x  于2021-12-30 转载在 Go  
字(1.4k)|赞(0)|评价(0)|浏览(461)

sync.WaitGroup的工作方式类似java里面的CountDownLatch,用于计数并阻塞等待。

下面的例子,我们来编写一个并行任务,以并发的方式处理一个批次的任务,可以控制它的最大并发度,并在所有任务全部完成后返回一个结果。

import (
	"fmt"
	"sync"
	"time"
	"unicode/utf8"
)

func concurrentPrinting(tasks []string, maxConcurrent int) int {
	tokens := make(chan struct{}, maxConcurrent)
	results := make(chan int, len(tasks))
	var wg sync.WaitGroup
	wg.Add(len(tasks))

	for _, t := range tasks {
		tokens <- struct{}{}
		go func(str string) {
			time.Sleep(time.Second * 2)
			fmt.Println(str)
			results <- utf8.RuneCountInString(str)
			<-tokens
			wg.Done()
		}(t)
	}

	wg.Wait()
	close(results)
	var totalCount int
	for count := range results {
		totalCount += count
	}
	return totalCount
}

func main() {
	tasks := []string{"hello", "world", "golang", "foo", "bar", "1024", "2048"}
	totalCount := concurrentPrinting(tasks, 2)
	fmt.Printf("print %d chars", totalCount)
}

sync.WaitGroup类型的结构体,声明后的零值就可以直接使用。

注意for循环中的运行在单独的goroutine中的匿名函数,如果在函数体中直接引用循环变量的话,拿到的可能全部都是最后一个变量,而作为函数的参数传递的则是正确地值。

WaitGroup的使用套路:

  1. wg.Add()
  2. go启动新的goroutine
  3. 在goroutine中完成任务时调用wg.Done()
  4. wg.Wait()等待所有goroutine运行结束

每一个并行任务都运行在独立的goroutine中,wg.Wait()也运行在独立的goroutine中,而主goroutine中range了一个channel,当这个channel关闭时,range结束。

使用tokens := make(chan struct{}, maxConcurrent)作为一个令牌池,拿到令牌后才能开始goroutine,使用完以后释放令牌,腾出空间以便开始下一个goroutine。

整个逻辑的思路分析:

  1. 有并发就要有goroutine
  2. 控制并发度就要用缓冲队列,类似token,拿到token才开新的goroutine,任务结束要释放token。
  3. 要等并发结束再返回就要用WaitGroup,提前记好数,结束一个任务就-1
  4. 要汇总结果就要用一个close掉的goroutine

相关文章