Go语言 停止bufio.带有停止通道的扫描仪

2sbarzqh  于 2023-09-28  发布在  Go
关注(0)|答案(2)|浏览(110)

我正在写一些东西,使用bufio.Scanneros.Stdin读取行,如下所示:

for s.scanner.Scan() {
  line := s.scanner.Text()
  // process line
}

这是在一个goroutine中运行的,我希望能够在chan struct{}关闭时停止它。然而,当Scan阻塞直到有另一行时,我不知道如何停止它,如果没有更多的输入,它将无限期地阻塞。
有人能给我指个方向吗?

xqk2d5yq

xqk2d5yq1#

通过创建多一个间接地址并忽略底层,我们可以停止。

// actual reading, converts input stream to a channel
func readUnderlying(lines chan interface{}) {
    s := bufio.NewScanner(os.Stdin)
    for s.Scan() {
        lines <- s.Text()
    }
    lines <- s.Err()
}

func read(stop chan struct{}) {
    input := make(chan interface{}) // input stream
    go readUnderlying(input) // go and read
    for {
        select { // read or close
        case lineOrErr := <-input:
            fmt.Println(lineOrErr)
        case <-stop:
            return
        }
    }
}

func main() {
    stop := make(chan struct{})
    go read(stop)

    // wait some to simulate blocking
    time.Sleep(time.Second * 20) // it will print what is given
    close(stop)
    time.Sleep(time.Second * 20) // stopped so no more processing
}
w9apscun

w9apscun2#

我有一个类似的问题(* 尽管我正在扫描stdoutstderr *),我想让父调用者关闭那些子go例程。我的解决方案是关闭stdoutstderr管道,这将随后结束Scan()循环。
为了关闭整个函数,我只需要从需要的地方传递一个通道调用到tearDown,这将停止两个子例程(stdoutRoutinestderrRoutine);因此清理停止了我所有的功能。

var (
    tearDown       = make(chan struct{})
    stdoutDoneChan = make(chan struct{})
    stderrDoneChan = make(chan struct{})
)

func main() {

  go runCommandThenCloseStdoutStderrPipe()

  // Send 'tearDown' channel command to close all functions after 1-second.
  // This will tear down the functions BEFORE the second set of BASH commands.
  time.Sleep(time.Second * 1)

  tearDown <- struct{}{}

  // Pad out some time to make sure main garbage collection doesn't automatically tear it down (allows code to show proof-of-concept)
  time.Sleep(time.Second * 5)
}

func runCommandThenCloseStdoutStderrPipe() {
  cmd := exec.Command("bash", "-c", "echo Hello; echo 1>&2 World!; sleep 2; echo Closing; echo 1>&2 loop...")

  stdout, err := cmd.StdoutPipe()
  if err != nil {
      panic(err)
  }
  stderr, err := cmd.StderrPipe()
  if err != nil {
      panic(err)
  }
  if err = cmd.Start(); err != nil {
      panic(err)
  }

  defer func() {
    if err = cmd.Process.Kill(); err != nil { panic(err) }
  }()

  go stdoutRoutine(stdout)
  go stderrRoutine(stderr)

  var exitLoop bool

  for !exitLoop {
    select {

    // Channel call to tear down children routines
    case <- tearDown:
      stdout.Close()
      stderr.Close()

      // Block to ensure those children close
      <- stdoutDoneChan
      <- stderrDoneChan
      exitLoop = true
    }
  }
  fmt.Println("All functions are now closed")
}

func stdoutRoutine(stdout io.ReadCloser) {

  defer func() {
    fmt.Println("Closing stdoutRoutine")
    stdoutDoneChan <- struct{}{}
  }()

  scanner := bufio.NewScanner(stdout)
  for scanner.Scan() {
    line := scanner.Text() // or whatever type of processing you need
    fmt.Println("OUT:", line)
  }
}

func stderrRoutine(stderr io.ReadCloser) {

  defer func() {
    fmt.Println("Closing stderrRoutine")
    stderrDoneChan <- struct{}{}
  }()

  scanner := bufio.NewScanner(stderr)
  for scanner.Scan() {
    line := scanner.Text()
    fmt.Println("ERR:", line)
  }
}

相关问题