Go语言 io.Pipe()导致WaitGroup阻塞

hgc7kmma  于 2022-12-20  发布在  Go
关注(0)|答案(1)|浏览(146)

我正在处理一个大约100 GB的大数据文件。这个大文件中的每一行都是一个JSON数据块,我想读取、压缩并存储在内存数据库中。

var wg sync.WaitGroup
for {
    line, err := reader.ReadString('\n')
    if err != nil {
        break
    }
    go func(index int) {
        wg.Add(1)
        pr, pw := io.Pipe()
        zw := lzw.NewWriter(pw, lzw.LSB, 8)
        _, err := io.Copy(zw, strings.NewReader(line))
        pw.Close()
        zw.Close()
        if err != nil {
            fmt.Println(err.Error())
        }
        b, err := io.ReadAll(pr)
        if err != nil {
            fmt.Println(err.Error())
        }
        client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
        pr.Close()
        wg.Done()
    }(index)
    if index%10000 == 0 {
        fmt.Println(index)
        wg.Wait()
    }
    index += 1
}

然而,这段代码在处理完前10000行后就停止了。当我把wg.Add(1)移到zw.Close()之后时,它会继续处理其余的行(但变得不稳定)。如果没有lzwio.Pipe(),当我试图以未压缩的方式存储确切的值时,一切都可以正常工作。
我不确定我是否没有正确使用WaitGroup,或者有一些与io.Pipe()相关的东西,我还不知道。

pw9qyyiw

pw9qyyiw1#

TLDR:
1-删除pr, pw := io.Pipe()使代码更加简单,因为它是多余的
尝试this

line, err := reader.ReadString('\n')
if err == io.EOF {
    wg.Wait()
    break
}
if err != nil {
    log.Fatal(err)
}
wg.Add(1)
go func(index int) {
    var buf bytes.Buffer
    { // lexical scoping (static scoping)
        zw := lzw.NewWriter(&buf, lzw.LSB, 8)
        n, err := zw.Write([]byte(line)) // n, err := io.Copy(zw, strings.NewReader(line))
        if err != nil {
            log.Fatal(err)
        }
        if int(n) != len(line) {
            log.Fatal(n, len(line))
        }
        // It is the caller's responsibility to call Close on the WriteCloser when finished writing.
        if err = zw.Close(); err != nil {
            log.Fatal(err)
        }
    }
    ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
    client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(buf.Bytes()), 1000*time.Hour)

    cancelFunc()
    wg.Done()
}(index)

if index%tenThousand == 0 {
    wg.Wait()
}

2-您需要将wg.Add(1)放在go func(index int) {之前:

wg.Add(1)
    go func(index int) {

3-wg.Wait()逻辑:

if index%10000 == 0 {
        fmt.Println(index)
        wg.Wait()
    }

如果index%10000 != 0,那么最后一次迭代会发生什么,这里当err == io.EOF时,你需要wg.Wait()来连接所有的goroutine:

if err == io.EOF {
    wg.Wait()
    fmt.Println("\n**** All done **** index =", index)
    break
}

4-您可以使用词法作用域(静态作用域)来限制某些变量的作用域,使代码更易于管理,并了解何时Closelzw.NewWriter

{ // lexical scoping (static scoping)
    zw := lzw.NewWriter(bufio.NewWriter(&buf), lzw.LSB, 8)
    n, err := io.Copy(zw, strings.NewReader(line))
    if err != nil {
        log.Fatal(err)
    }
    if int(n) != len(line) {
        log.Fatal(n, len(line))
    }
    // It is the caller's responsibility to call Close on the WriteCloser when finished writing.
    if err = zw.Close(); err != nil {
        log.Fatal(err)
    }
}

5-务必检查错误,例如:

if err = zw.Close(); err != nil {
    log.Fatal(err)
}

这是接近你的代码的工作版本--尝试一下this,看看并发逻辑会发生什么(不推荐,因为它有多余的goroutine和io.Pipe--只是工作:

package main

import (
    "bufio"
    "compress/lzw"
    "context"
    "encoding/base64"
    "fmt"
    "io"
    "log"
    "strings"
    "sync"
    "time"
)

func main() {
    index := 0
    client := &myClient{}
    reader := bufio.NewReader(file)
    // your code:
    var wg sync.WaitGroup
    for {
        index++
        line, err := reader.ReadString('\n')
        if err != nil {
            msg <- fmt.Sprint(index, " Done not waiting with err: ", err, time.Now())
            wg.Wait() // break waiting // if index%tenThousand != 0
            break
        }
        wg.Add(1)
        go func(i int) {
            msg <- fmt.Sprint(i, " Enter running ... ", time.Now())
            asyncReader, asyncWriter := io.Pipe() // make it async to read and write
            zipWriter := lzw.NewWriter(asyncWriter, lzw.LSB, 8)
            go func() { // async
                _, err := io.Copy(zipWriter, strings.NewReader(line))
                if err != nil {
                    log.Fatal(err)
                }
                _ = zipWriter.Close()
                _ = asyncWriter.Close() // for io.ReadAll
            }()
            b, err := io.ReadAll(asyncReader)
            if err != nil {
                log.Fatal(err)
            }
            client.Set(context.Background(), fmt.Sprintf("%d", i), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
            asyncReader.Close()
            time.Sleep(1 * time.Second)
            msg <- fmt.Sprint(i, " Exit running ... ", time.Now())
            wg.Done()
        }(index)

        msg <- fmt.Sprint(index, " ", index%tenThousand == 0, " after go call")
        if index%tenThousand == 0 {
            wg.Wait()
            msg <- fmt.Sprint("..", index, " Done waiting after go call. ", time.Now())
        }
    }
    msg <- "Bye forever."

    wg.Wait()
    close(msg)
    wgMsg.Wait()
}

// just for the Go Playground:
const tenThousand = 2

type myClient struct {
}

func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {
    // fmt.Println("a =", a, ", b =", b, ", t =", t)
    if ctx.Err() != nil {
        fmt.Println(ctx.Err())
    }
}

var file, myw = io.Pipe()

func init() {
    go func() {
        for i := 1; i <= tenThousand+1; i++ {
            fmt.Fprintf(myw, "%d text to compress aaaaaaaaaaaaaa\n", i)
        }
        myw.Close()
    }()
    wgMsg.Add(1)
    go func() {
        defer wgMsg.Done()
        for s := range msg {
            fmt.Println(s)
        }
    }()
}

var msg = make(chan string, 100)
var wgMsg sync.WaitGroup

输出:

相关问题