// actual reading, converts input stream to a channel
func readUnderlying(lines chan interface{}) {
s := bufio.NewScanner(os.Stdin)
for s.Scan() {
lines <- s.Text()
}
lines <- s.Err()
}
func read(stop chan struct{}) {
input := make(chan interface{}) // input stream
go readUnderlying(input) // go and read
for {
select { // read or close
case lineOrErr := <-input:
fmt.Println(lineOrErr)
case <-stop:
return
}
}
}
func main() {
stop := make(chan struct{})
go read(stop)
// wait some to simulate blocking
time.Sleep(time.Second * 20) // it will print what is given
close(stop)
time.Sleep(time.Second * 20) // stopped so no more processing
}
var (
tearDown = make(chan struct{})
stdoutDoneChan = make(chan struct{})
stderrDoneChan = make(chan struct{})
)
func main() {
go runCommandThenCloseStdoutStderrPipe()
// Send 'tearDown' channel command to close all functions after 1-second.
// This will tear down the functions BEFORE the second set of BASH commands.
time.Sleep(time.Second * 1)
tearDown <- struct{}{}
// Pad out some time to make sure main garbage collection doesn't automatically tear it down (allows code to show proof-of-concept)
time.Sleep(time.Second * 5)
}
func runCommandThenCloseStdoutStderrPipe() {
cmd := exec.Command("bash", "-c", "echo Hello; echo 1>&2 World!; sleep 2; echo Closing; echo 1>&2 loop...")
stdout, err := cmd.StdoutPipe()
if err != nil {
panic(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
panic(err)
}
if err = cmd.Start(); err != nil {
panic(err)
}
defer func() {
if err = cmd.Process.Kill(); err != nil { panic(err) }
}()
go stdoutRoutine(stdout)
go stderrRoutine(stderr)
var exitLoop bool
for !exitLoop {
select {
// Channel call to tear down children routines
case <- tearDown:
stdout.Close()
stderr.Close()
// Block to ensure those children close
<- stdoutDoneChan
<- stderrDoneChan
exitLoop = true
}
}
fmt.Println("All functions are now closed")
}
func stdoutRoutine(stdout io.ReadCloser) {
defer func() {
fmt.Println("Closing stdoutRoutine")
stdoutDoneChan <- struct{}{}
}()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text() // or whatever type of processing you need
fmt.Println("OUT:", line)
}
}
func stderrRoutine(stderr io.ReadCloser) {
defer func() {
fmt.Println("Closing stderrRoutine")
stderrDoneChan <- struct{}{}
}()
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
line := scanner.Text()
fmt.Println("ERR:", line)
}
}
2条答案
按热度按时间xqk2d5yq1#
通过创建多一个间接地址并忽略底层,我们可以停止。
w9apscun2#
我有一个类似的问题(* 尽管我正在扫描
stdout
和stderr
*),我想让父调用者关闭那些子go例程。我的解决方案是关闭stdout
和stderr
管道,这将随后结束Scan()
循环。为了关闭整个函数,我只需要从需要的地方传递一个通道调用到
tearDown
,这将停止两个子例程(stdoutRoutine
和stderrRoutine
);因此清理停止了我所有的功能。