GO语言:致命错误:所有的goroutine都睡着了-死锁

50pmv0ei  于 2023-05-11  发布在  Go
关注(0)|答案(4)|浏览(150)

下面的代码可以很好地处理硬编码的JSON数据,但是当我从文件中读取JSON数据时就不起作用了。使用sync.WaitGroup时出现fatal error: all goroutines are asleep - deadlock错误。

使用硬编码JSON数据的工作示例:

package main

import (
    "bytes"
    "fmt"
    "os/exec"
    "time"
)

func connect(host string) {
    cmd := exec.Command("ssh", host, "uptime")
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        fmt.Println(err)
    }
    fmt.Printf("%s: %q\n", host, out.String())
    time.Sleep(time.Second * 2)
    fmt.Printf("%s: DONE\n", host)
}

func listener(c chan string) {
    for {
        host := <-c
        go connect(host)
    }
}

func main() {
    hosts := [2]string{"user1@111.79.154.111", "user2@111.79.190.222"}
    var c chan string = make(chan string)
    go listener(c)

    for i := 0; i < len(hosts); i++ {
        c <- hosts[i]
    }
    var input string
    fmt.Scanln(&input)
}

输出:

user@user-VirtualBox:~/go$ go run channel.go
user1@111.79.154.111: " 09:46:40 up 86 days, 18:16,  0 users,  load average: 5"
user2@111.79.190.222: " 09:46:40 up 86 days, 17:27,  1 user,  load average: 9"
user1@111.79.154.111: DONE
user2@111.79.190.222: DONE

不工作-阅读JSON数据文件的示例:

package main

import (
    "bytes"
    "fmt"
    "os/exec"
    "time"
    "encoding/json"
    "os"
    "sync"
)

func connect(host string) {
    cmd := exec.Command("ssh", host, "uptime")
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        fmt.Println(err)
    }
    fmt.Printf("%s: %q\n", host, out.String())
    time.Sleep(time.Second * 2)
    fmt.Printf("%s: DONE\n", host)
}

func listener(c chan string) {
    for {
        host := <-c
        go connect(host)
    }
}

type Content struct {
    Username string `json:"username"`
    Ip       string `json:"ip"`
}

func main() {
    var wg sync.WaitGroup

    var source []Content
    var hosts []string
    data := json.NewDecoder(os.Stdin)
    data.Decode(&source)

    for _, value := range source {
        hosts = append(hosts, value.Username + "@" + value.Ip)
    }

    var c chan string = make(chan string)
    go listener(c)

    for i := 0; i < len(hosts); i++ {
        wg.Add(1)
        c <- hosts[i]
        defer wg.Done()
    }

    var input string
    fmt.Scanln(&input)

    wg.Wait()
}

输出

user@user-VirtualBox:~/go$ go run deploy.go < hosts.txt 
user1@111.79.154.111: " 09:46:40 up 86 days, 18:16,  0 users,  load average: 5"
user2@111.79.190.222: " 09:46:40 up 86 days, 17:27,  1 user,  load average: 9"
user1@111.79.154.111 : DONE
user2@111.79.190.222: DONE
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc210000068)
    /usr/lib/go/src/pkg/runtime/sema.goc:199 +0x30
sync.(*WaitGroup).Wait(0xc210047020)
    /usr/lib/go/src/pkg/sync/waitgroup.go:127 +0x14b
main.main()
    /home/user/go/deploy.go:64 +0x45a

goroutine 3 [chan receive]:
main.listener(0xc210038060)
    /home/user/go/deploy.go:28 +0x30
created by main.main
    /home/user/go/deploy.go:53 +0x30b
exit status 2
user@user-VirtualBox:~/go$

HOSTS.TXT

[
   {
      "username":"user1",
      "ip":"111.79.154.111"
   },
   {
      "username":"user2",
      "ip":"111.79.190.222"
   }
]
41ik7eoe

41ik7eoe1#

当main函数结束时,Go程序结束。

从语言规范
程序执行首先初始化main包,然后调用函数main。当该函数调用返回时,程序退出。它不会等待其他(非主)goroutine完成。
因此,你需要等待你的goroutines完成。常见的解决方案是使用sync.WaitGroup对象。
同步goroutine的最简单代码:

package main

import "fmt"
import "sync"

var wg sync.WaitGroup // 1

func routine() {
    defer wg.Done() // 3
    fmt.Println("routine finished")
}

func main() {
    wg.Add(1) // 2
    go routine() // *
    wg.Wait() // 4
    fmt.Println("main finished")
}

和同步多个goroutine

package main

import "fmt"
import "sync"

var wg sync.WaitGroup // 1

func routine(i int) {
    defer wg.Done() // 3
    fmt.Printf("routine %v finished\n", i)
}

func main() {
    for i := 0; i < 10; i++ {
        wg.Add(1) // 2
        go routine(i) // *
    }
    wg.Wait() // 4
    fmt.Println("main finished")
}

按执行顺序排列的WaitGroup用法。
1.全局变量的声明。使其全局化是使其对所有函数和方法可见的最简单方法。
1.增加计数器。这必须在main goroutine中完成,因为由于内存模型的保证,不能保证新启动的goroutine会在4之前执行。
1.减少计数器。这必须在goroutine的出口处完成。使用延迟调用,我们确保它将被调用时,函数结束,无论但无论如何结束。
1.等待计数器达到0。这必须在main goroutine中完成,以防止程序退出。

  • 在开始新的工作程序之前,对实际参数进行评估。因此,需要在wg.Add(1)之前显式地评估它们,以便可能的恐慌代码不会留下增加的计数器。

使用

param := f(x)
wg.Add(1)
go g(param)

而不是

wg.Add(1)
go g(f(x))
7vux5j2d

7vux5j2d2#

感谢您的非常好和详细的解释Grzegorz ur。我想指出的一点是,通常需要线程化的func不会在main()中,所以我们会有这样的东西:

package main

import (
    "bufio"
    "fmt"
    "io"
    "io/ioutil"
    "math/rand"
    "os"
    "reflect"
    "regexp"
    "strings"
    "sync"
    "time"
)

var wg sync.WaitGroup    // VERY IMP to declare this globally, other wise one   //would hit "fatal error: all goroutines are asleep - deadlock!"

func doSomething(arg1 arg1Type) {
     // cured cancer
}

func main() {
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    randTime := r.Intn(10)
    wg.Add(1)    
    go doSomething(randTime)
    wg.Wait()
    fmt.Println("Waiting for all threads to finish")
}

我想指出的是,wg的全局声明对于所有线程在main()之前完成非常重要

6qfn3psc

6qfn3psc3#

package main

/*
  Simulation for sending messages from threads for processing,
  and getting a response (processing result) to the thread
*/
import (
    "fmt"
    "math/rand"
    "time"
)

type (
    TChans []chan TMsgRec

    TMsgRec struct {
        name string //channel name
        rid  int    //-1 or  index of response channel in TChans
        msg  string // message
        note string // comment
    }

    TThRec struct { // for thread
        name string
        rid  int          // index of response channel in TChans (or -1)
        job  chan TMsgRec // chanel for send message to Receiver
        resp chan TMsgRec // response channel back to thread
    }
)

func main() {

    index := -1
    Chans := make(TChans, 100)
    index = NewChanIndex(&Chans)
    Job := Chans[index] // channel for send message from threads to Receiver

    index = NewChanIndex(&Chans) // channel index for response, for the thread "1th"
    go ping(TThRec{name: "1th", job: Job, rid: index, resp: Chans[index]})

    index = NewChanIndex(&Chans) // channel index for response, for the thread "2th"
    go ping(TThRec{name: "2th", job: Job, rid: index, resp: Chans[index]})

    Receiver(Job, Chans)
}

func Receiver(c chan TMsgRec, pChans TChans) {
    var v TMsgRec
    for {

        select {
        case v = <-c: // receive message
            {
                if v.rid > -1 {
                    //pChans[v.rid] <- TMsgRec{name: v.name, rid: -1, msg: fmt.Sprint(v.msg, ":receiver "), note: ""}
                    go worker(v, pChans[v.rid])
                }
            }
        default:
            {
                //fmt.Println("receiver")
                SleepM(2)
            }
        }
    }

}

func worker(v TMsgRec, r chan TMsgRec) {
    // simulation SQL query, or auther process
    SleepM(rand.Intn(50))
    v.msg = v.msg + ":worker"
    r <- v
}

func waitResponse(d chan TMsgRec, pTimeout int) (bool, TMsgRec) {
    var v TMsgRec
    for {
        select {
        case v = <-d:
            {
                return true, v
            }
        case <-time.After(10 * time.Second):
            {
                return false, v
            }
        }
    }
}

func ping(pParam TThRec) {
    SleepM(10)
    var v TMsgRec
    ok := true
    i := 0
    for i < 500 {
        if ok {
            ok = false
            pParam.job <- TMsgRec{name: pParam.name, rid: pParam.rid, msg: fmt.Sprint(i), note: ""}
            i++
        }
        if pParam.rid > -1 {
            if !ok {
                ok, v = waitResponse(pParam.resp, 10)
                if ok {
                    fmt.Println(v.name, v.msg)
                    SleepM(1)
                } else {
                    fmt.Println(pParam.name, "response timeout")
                }

            }
        } else {
            SleepM(1)
        }
    }
    fmt.Println(v.name, "-- end --")
}

func NewChanIndex(pC *TChans) int {
    for i, v := range *pC {
        if v == nil {
            (*pC)[i] = make(chan TMsgRec)
            return i
        }
    }
    return -1
}

func FreeRespChan(pC *TChans, pIndex int) {
    if (*pC)[pIndex] != nil {
        close((*pC)[pIndex]) //close channel
        (*pC)[pIndex] = nil
    }
}

func SleepM(pMilliSec int) { // sleep millisecounds
    time.Sleep(time.Duration(pMilliSec) * time.Millisecond)
}
vkc1a9a2

vkc1a9a24#

试试这个代码片段

package main

import (
    "bytes"
    "fmt"
    "os/exec"
    "time"
    "sync"
)

func connect(host string, wg *sync.WaitGroup) {
    defer wg.Done()
    cmd := exec.Command("ssh", host, "uptime")
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        fmt.Println(err)
    }
    fmt.Printf("%s: %q\n", host, out.String())
    time.Sleep(time.Second * 2)
    fmt.Printf("%s: DONE\n", host)
}

func listener(c chan string,wg *sync.WaitGroup) {
    for {
        host,ok := <-c
        // check channel is closed or not
        if !ok{
            break
        }
        go connect(host)
    }

}

func main() {
    var wg sync.WaitGroup
    hosts := [2]string{"user1@111.79.154.111", "user2@111.79.190.222"}
    var c chan string = make(chan string)
    go listener(c)

    for i := 0; i < len(hosts); i++ {
        wg.Add(1)
        c <- hosts[i]
    }
    close(c)
    var input string
    fmt.Scanln(&input)
    wg.Wait()
}

相关问题