我正在处理一个大约100 GB的大数据文件。这个大文件中的每一行都是一个JSON数据块,我想读取、压缩并存储在内存数据库中。
var wg sync.WaitGroup
for {
line, err := reader.ReadString('\n')
if err != nil {
break
}
go func(index int) {
wg.Add(1)
pr, pw := io.Pipe()
zw := lzw.NewWriter(pw, lzw.LSB, 8)
_, err := io.Copy(zw, strings.NewReader(line))
pw.Close()
zw.Close()
if err != nil {
fmt.Println(err.Error())
}
b, err := io.ReadAll(pr)
if err != nil {
fmt.Println(err.Error())
}
client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
pr.Close()
wg.Done()
}(index)
if index%10000 == 0 {
fmt.Println(index)
wg.Wait()
}
index += 1
}
然而,这段代码在处理完前10000行后就停止了。当我把wg.Add(1)
移到zw.Close()
之后时,它会继续处理其余的行(但变得不稳定)。如果没有lzw
和io.Pipe()
,当我试图以未压缩的方式存储确切的值时,一切都可以正常工作。
我不确定我是否没有正确使用WaitGroup
,或者有一些与io.Pipe()
相关的东西,我还不知道。
1条答案
按热度按时间pw9qyyiw1#
TLDR:
1-删除
pr, pw := io.Pipe()
使代码更加简单,因为它是多余的,尝试this:
2-您需要将
wg.Add(1)
放在go func(index int) {
之前:3-
wg.Wait()
逻辑:如果
index%10000 != 0
,那么最后一次迭代会发生什么,这里当err == io.EOF
时,你需要wg.Wait()
来连接所有的goroutine:4-您可以使用词法作用域(静态作用域)来限制某些变量的作用域,使代码更易于管理,并了解何时
Close
lzw.NewWriter
:5-务必检查错误,例如:
这是接近你的代码的工作版本--尝试一下this,看看并发逻辑会发生什么(不推荐,因为它有多余的goroutine和
io.Pipe
--只是工作:输出: