type Server struct {
s chan int
r chan int
c *sync.Cond
state uint32
}
const (
sNormal = 0
sQuitting = 1
sReconnecting = 2
)
func New() *Server {
s := &Server{
s: make(chan int),
r: make(chan int),
c: sync.NewCond(&sync.Mutex{}),
}
go s.sender()
// go s.receiver()
return s
}
func (s *Server) sender() {
//
for {
select {
case data := <-s.s:
//do stuff with data
default:
s.c.L.Lock()
L:
for {
switch atomic.LoadUint32(&s.state) {
case sNormal:
break L
case sReconnecting:
case sQuitting:
s.c.L.Unlock()
return
}
s.c.Wait()
}
s.c.L.Unlock()
}
}
}
//repeat for receiver
func (s *Server) Reconnect() {
var cannotReconnect bool
atomic.StoreUint32(&s.state, sReconnecting)
//keep trying to reconnect
if cannotReconnect {
atomic.StoreUint32(&s.state, sQuitting)
} else {
atomic.StoreUint32(&s.state, sNormal)
}
s.c.Broadcast()
}
L:
for {
select {
case _, ok := <-ch:
if !ok { //ch is closed //immediately return err
break L
}
default: //all other case not-ready: means nothing in ch for now
break L
}
}
5条答案
按热度按时间3wabscal1#
不可能清空没有循环的通道。如果你没有任何并发的接收者,那么你可以使用这个简单的循环:
如果你有并发的接收者,那么使用循环:
bmp9r5qi2#
你所描述的本质上是活泼的,因为可能有合法的请求重新连接的渠道。与其试图耗尽渠道,我建议跟踪时间。
在你的重新连接频道上,发布时间。完成重新连接后,请注意时间。在使用重新连接通道时,丢弃比上次重新连接时间更早的任何消息。
另一个实现这一点的锁步解决方案是将reconnect通道设置为bool。发布“true”以重新连接。当重新连接完成时,发布“false”。然后消费频道,直到发现“假的”
vxf3dgd43#
另一种方法是使用
sync.Cond
和atomic
,沿着如下:playground
fiei3ece4#
不能编辑
Simon Fox
的答案,所以在这里写:cause <-ch立即返回err。
更安全的版本(处理关闭的更改):
8gsdolmq5#
这听起来像是你想要一个重置goroutine而不是重置通道。它将具有来自发送复位信号的一侧的输入,以及到接收器的输出。当这个goroutine接收到一个重新连接的请求时,它将其传递给接收者。然后,它等待在第三个信道上从接收方接收回确认,丢弃在此期间接收到的任何重新连接请求。总共3个通道,1个输入,1个输出,1个确认。