Go语言 如何明确清空一个通道?

zvokhttg  于 2023-05-11  发布在  Go
关注(0)|答案(5)|浏览(265)

简短版本:有没有一种方法可以在不重新创建或循环的情况下清空go通道?
原因:我使用两个通道来发送和接收数据,并且我有一个额外的通道来表示需要重新连接。
现在,当传输已被重置/重新连接,我想‘清空‘额外的通道,以确保没有任何挥之不去的其他重置请求,这将导致事情再次重新连接。

3wabscal

3wabscal1#

不可能清空没有循环的通道。如果你没有任何并发的接收者,那么你可以使用这个简单的循环:

for len(ch) > 0 {
  <-ch
}

如果你有并发的接收者,那么使用循环:

L:
for {
    select {
    case <-c:
    default:
       break L
    }
}
bmp9r5qi

bmp9r5qi2#

你所描述的本质上是活泼的,因为可能有合法的请求重新连接的渠道。与其试图耗尽渠道,我建议跟踪时间。
在你的重新连接频道上,发布时间。完成重新连接后,请注意时间。在使用重新连接通道时,丢弃比上次重新连接时间更早的任何消息。
另一个实现这一点的锁步解决方案是将reconnect通道设置为bool。发布“true”以重新连接。当重新连接完成时,发布“false”。然后消费频道,直到发现“假的”

vxf3dgd4

vxf3dgd43#

另一种方法是使用sync.Condatomic,沿着如下:

type Server struct {
    s     chan int
    r     chan int
    c     *sync.Cond
    state uint32
}

const (
    sNormal       = 0
    sQuitting     = 1
    sReconnecting = 2
)

func New() *Server {
    s := &Server{
        s: make(chan int),
        r: make(chan int),
        c: sync.NewCond(&sync.Mutex{}),
    }
    go s.sender()
    // go s.receiver()
    return s
}
func (s *Server) sender() {
    //
    for {
        select {
        case data := <-s.s:
        //do stuff with data
        default:
            s.c.L.Lock()
        L:
            for {
                switch atomic.LoadUint32(&s.state) {
                case sNormal:
                    break L
                case sReconnecting:
                case sQuitting:
                    s.c.L.Unlock()
                    return
                }
                s.c.Wait()
            }
            s.c.L.Unlock()
        }
    }
}

//repeat for receiver

func (s *Server) Reconnect() {
    var cannotReconnect bool
    atomic.StoreUint32(&s.state, sReconnecting)
    //keep trying to reconnect
    if cannotReconnect {
        atomic.StoreUint32(&s.state, sQuitting)
    } else {
        atomic.StoreUint32(&s.state, sNormal)
    }
    s.c.Broadcast()
}

playground

fiei3ece

fiei3ece4#

不能编辑Simon Fox的答案,所以在这里写:

  • 如果改变关闭,将得到无限循环:

cause <-ch立即返回err。
更安全的版本(处理关闭的更改):

L:
    for {
        select {
        case _, ok := <-ch:
            if !ok { //ch is closed //immediately return err
                break L
            }
        default: //all other case not-ready: means nothing in ch for now
            break L
        }
    }
8gsdolmq

8gsdolmq5#

这听起来像是你想要一个重置goroutine而不是重置通道。它将具有来自发送复位信号的一侧的输入,以及到接收器的输出。当这个goroutine接收到一个重新连接的请求时,它将其传递给接收者。然后,它等待在第三个信道上从接收方接收回确认,丢弃在此期间接收到的任何重新连接请求。总共3个通道,1个输入,1个输出,1个确认。

相关问题