Go语言 当关闭缓冲通道时,是否应该排空它

gcuhipw9  于 2023-08-01  发布在  Go
关注(0)|答案(3)|浏览(130)

给定Go语言中一个(部分)填充的缓冲通道

ch := make(chan *MassiveStruct, n)
for i := 0; i < n; i++ {
    ch <- NewMassiveStruct()
}

字符串
在不知道读取器何时将从通道读取的情况下,在关闭通道(由写入器)时也排空通道(例如,数量有限,并且它们当前正忙碌)。那就是

close(ch)
for range ch {}


如果通道上有其他并发读取器,是否保证这样的循环结束?

上下文:一个具有固定数量的工作线程的队列服务,当服务停止时(但不一定立即进行GCed),它应该放弃处理任何排队的东西。所以我关闭,以表明工人的服务正在终止。我可以立即清空剩余的“队列”,让GC释放分配的资源,我可以读取并忽略worker中的值,我可以让通道运行在读取器中,并在写入器中将通道设置为nil,以便GC清理所有内容。我不知道哪种方式最干净。

xeufq47z

xeufq47z1#

这取决于你的程序,但一般来说,我倾向于说不(你不需要在关闭它之前清除通道):如果在关闭通道时通道中有项目,则仍在从通道阅读的任何读取器将接收项目,直到通道为空。
下面是一个例子:

package main

import (
    "sync"
    "time"
)

func main() {

    var ch = make(chan int, 5)
    var wg sync.WaitGroup
    wg.Add(1)

    for range make([]struct{}, 2) {
        go func() {
            for i := range ch {
                wg.Wait()
                println(i)
            }
        }()
    }

    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)

    wg.Done()
    time.Sleep(1 * time.Second)
}

字符串
在这里,程序将输出所有项,尽管在任何读者可以从通道读取之前通道被严格关闭。

vsdwdz23

vsdwdz232#

有更好的方法来实现你想要实现的目标。您当前的方法可能会导致丢弃一些记录,并随机处理其他记录(因为耗尽循环正在与所有消费者竞争)。这并没有真正解决目标。
你想要的是取消。下面是一个来自Go Concurrency Patterns: Pipelines and cancellation的示例

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

字符串
你给所有的goroutine传递一个done通道,当你想让它们都停止处理时,你就关闭它。如果您经常这样做,您可能会发现golang.org/x/net/context包很有用,它将这种模式形式化,并添加了一些额外的特性(如超时)。

eanckbw9

eanckbw93#

我觉得所提供的答案实际上并没有澄清很多,除了暗示既不需要排水也不需要关闭。因此,下面描述的上下文的解决方案在我看来很干净,它终止了worker并删除了对它们或有问题的通道的所有引用,因此,让GC清理通道及其内容:

type worker struct {
    submitted chan Task
    stop      chan bool
    p         *Processor
}

// executed in a goroutine
func (w *worker) run() {
    for {
        select {
        case task := <-w.submitted:
            if err := task.Execute(w.p); err != nil {
                logger.Error(err.Error())
            }
        case <-w.stop:
            logger.Warn("Worker stopped")
            return
        }
    }
}

func (p *Processor) Stop() {
    if atomic.CompareAndSwapInt32(&p.status, running, stopped) {
        for _, w := range p.workers {
            w.stop <- true
        }
        // GC all workers as soon as goroutines stop
        p.workers = nil
        // GC all published data when workers terminate
        p.submitted = nil
        // no need to do the following above:
        // close(p.submitted)
        // for range p.submitted {}
    }
}

字符串

相关问题