Go语言 如何使用频道广播消息

6xfqseft  于 2022-12-07  发布在  Go
关注(0)|答案(6)|浏览(180)

我是新去,我试图创建一个简单的聊天服务器,客户端可以广播消息到所有连接的客户端。
在我的服务器中,我有一个接受连接的goroutine(无限for循环),所有的连接都由一个通道接收。

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

然后,我为每一个连接的客户端启动一个处理程序(goroutine),在处理程序中,我尝试通过迭代通道向所有的连接进行广播。

for c := range ch {
    conn.Write(msg)
}

但是,我不能广播,因为(我从阅读文档中想到)在迭代之前需要关闭通道。我不确定何时应该关闭通道,因为我想不断地接受新的连接,而关闭通道不允许我这样做。如果有人能帮助我,或者提供一种更好的方法来向所有连接的客户端广播消息,我将不胜感激。

bgibtngc

bgibtngc1#

你所做的是一个扇出模式,也就是说,多个端点监听一个输入源。这种模式的结果是,只要输入源中有消息,这些监听器中只有一个能够得到消息。唯一的例外是通道的close。这个close将被所有的监听器识别,因此是一个“广播”。
但你要做的是广播从连接读取的消息,所以我们可以这样做:

当监听程序的数量已知时

让每个工作人员收听专用广播信道,并将消息从主信道调度到每个专用广播信道。

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

然后我们可以有一群工人:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

然后开始我们的监听:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

还有一个调度员:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

当监听程序的数量未知时

在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,每当你需要一个新的worker时,你需要创建一个新的worker,启动它,然后把它推到workers切片中。但是这个方法需要一个线程安全的切片,它需要一个锁。其中一个实现可能如下所示:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

每当您要启动Worker时:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

您的调度程序将更改为:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

最后一句话:永远不要留下一个悬空的goroutine
其中一个良好做法是:永远不要留下一个悬空的goroutine,所以当你听完之后,你需要关闭所有的goroutine,这将通过worker中的quit通道来完成:
首先,我们需要创建一个全局quit信令通道:

globalQuit := make(chan struct{})

每当我们创建一个worker时,我们就将globalQuit通道分配给它作为它的退出信号:

worker.quit = globalQuit

然后,当我们想要关闭所有工作线程时,只需执行以下操作:

close(globalQuit)

由于close会被所有侦听的goroutine识别(这一点你已经理解了),所以所有的goroutine都会被返回。记住关闭你的调度器例程,但我会把它留给你:)

zaq34kh6

zaq34kh62#

一个更优雅的解决方案是“代理”,客户端可以在其中订阅和取消订阅消息。
为了优雅地处理订阅和取消订阅,我们可以利用通道,这样接收和分发消息的代理的主循环可以使用单个select语句合并所有这些消息,并且同步是根据解决方案的性质给出的。
另一个技巧是将订阅者存储在一个Map中,从我们用来向他们分发消息的通道Map。因此,使用通道作为Map中的键,然后添加和删除客户端是“死”简单的。这是可能的,因为通道值是可比较的,它们的比较是非常有效的,因为通道值是指向通道描述符的简单指针。
下面是一个简单的代理实现:

type Broker[T any] struct {
    stopCh    chan struct{}
    publishCh chan T
    subCh     chan chan T
    unsubCh   chan chan T
}

func NewBroker[T any]() *Broker[T] {
    return &Broker[T]{
        stopCh:    make(chan struct{}),
        publishCh: make(chan T, 1),
        subCh:     make(chan chan T, 1),
        unsubCh:   make(chan chan T, 1),
    }
}

func (b *Broker[T]) Start() {
    subs := map[chan T]struct{}{}
    for {
        select {
        case <-b.stopCh:
            return
        case msgCh := <-b.subCh:
            subs[msgCh] = struct{}{}
        case msgCh := <-b.unsubCh:
            delete(subs, msgCh)
        case msg := <-b.publishCh:
            for msgCh := range subs {
                // msgCh is buffered, use non-blocking send to protect the broker:
                select {
                case msgCh <- msg:
                default:
                }
            }
        }
    }
}

func (b *Broker[T]) Stop() {
    close(b.stopCh)
}

func (b *Broker[T]) Subscribe() chan T {
    msgCh := make(chan T, 5)
    b.subCh <- msgCh
    return msgCh
}

func (b *Broker[T]) Unsubscribe(msgCh chan T) {
    b.unsubCh <- msgCh
}

func (b *Broker[T]) Publish(msg T) {
    b.publishCh <- msg
}

使用示例:

func main() {
    // Create and start a broker:
    b := NewBroker[string]()
    go b.Start()

    // Create and subscribe 3 clients:
    clientFunc := func(id int) {
        msgCh := b.Subscribe()
        for {
            fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
        }
    }
    for i := 0; i < 3; i++ {
        go clientFunc(i)
    }

    // Start publishing messages:
    go func() {
        for msgId := 0; ; msgId++ {
            b.Publish(fmt.Sprintf("msg#%d", msgId))
            time.Sleep(300 * time.Millisecond)
        }
    }()

    time.Sleep(time.Second)
}

上面的输出将是(在Go Playground上试试):

Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3

改进

您可以考虑以下改进。这些改进可能有用,也可能不有用,这取决于您如何/如何使用代理。
Broker.Unsubscribe()可以关闭消息通道,用信号通知不再在其上发送消息:

func (b *Broker[T]) Unsubscribe(msgCh chan T) {
    b.unsubCh <- msgCh
    close(msgCh)
}

这将允许客户端通过消息通道进行range,如下所示:

msgCh := b.Subscribe()
for msg := range msgCh {
    fmt.Printf("Client %d got message: %v\n", id, msg)
}

然后,如果有人取消订阅此msgCh,如下所示:

b.Unsubscribe(msgCh)

在处理完调用Unsubscribe()之前发送的所有消息后,上述range循环将终止。
如果您希望客户端依赖于正在关闭的消息通道,并且代理的生存期比应用的生存期短,那么您还可以在停止代理时关闭所有订阅的客户端,方法如下:

case <-b.stopCh:
    for msgCh := range subs {
        close(msgCh)
    }
    return
qpgpyjmq

qpgpyjmq3#

广播到一个通道切片并使用同步。互斥锁来管理通道添加和删除可能是最简单的方法在您的情况下。
下面是您可以在golang中对broadcast执行的操作:

  • 您可以使用sync.Cond.广播共享状态更改。这种方式不需要任何alloc once设置,但您不能添加超时功能或使用其他通道。
  • 您可以通过关闭旧通道广播共享状态更改,并创建新通道和sync.Mutex。这样,每个状态更改都有一个分配,但您可以添加超时功能并使用另一个通道。
  • 你可以广播到函数回调的一个切片,并使用sync.mutex来管理它们。调用者可以做通道的事情。这样每个调用者有多个alloc,并与另一个通道一起工作。
  • 你可以广播到通道的一个片段,并使用sync.mutex来管理它们。这样每个调用者有多个分配,并与另一个通道一起工作。
  • 你可以广播到一个分片sync.WaitGroup并使用sync.Mutex来管理它们。
but5z9lq

but5z9lq4#

这是一个迟来的答案,但我认为它可能会安抚一些好奇的读者。
Go通道在并发性方面受到广泛欢迎。

围棋界就死板地遵循着这样的说法:

不通过共享内存进行通信;而是通过通信来共享存储器。
我对此完全中立,我认为在广播方面应该考虑其他选项,而不是定义明确的channels
以下是我的看法:来自同步包的条件是widely overlooked。在非常相同的上下文中实现青铜人建议的braodcaster值得注意。
我很高兴听到icza建议使用频道并在频道上广播消息。我遵循同样的方法并使用sync的条件变量:

// Broadcaster is the struct which encompasses broadcasting
type Broadcaster struct {
    cond        *sync.Cond
    subscribers map[interface{}]func(interface{})
    message     interface{}
    running     bool
}

这是我们整个广播理念所依赖的主要结构。
下面,我为这个结构定义了一些行为。简单地说,订阅者应该能够被添加,删除,整个过程应该是可撤销的。

// SetupBroadcaster gives the broadcaster object to be used further in messaging
    func SetupBroadcaster() *Broadcaster {
    
        return &Broadcaster{
            cond:        sync.NewCond(&sync.RWMutex{}),
            subscribers: map[interface{}]func(interface{}){},
        }
    }
    
    // Subscribe let others enroll in broadcast event!
    func (b *Broadcaster) Subscribe(id interface{}, f func(input interface{})) {
    
        b.subscribers[id] = f
    }
    
    // Unsubscribe stop receiving broadcasting
    func (b *Broadcaster) Unsubscribe(id interface{}) {
        b.cond.L.Lock()
        delete(b.subscribers, id)
        b.cond.L.Unlock()
    }
    
    // Publish publishes the message
    func (b *Broadcaster) Publish(message interface{}) {
        go func() {
            b.cond.L.Lock()
    
            b.message = message
            b.cond.Broadcast()
            b.cond.L.Unlock()
        }()
    }
    
    // Start the main broadcasting event
    func (b *Broadcaster) Start() {
        b.running = true
        for b.running {
            b.cond.L.Lock()
            b.cond.Wait()
            go func() {
                for _, f := range b.subscribers {
                    f(b.message) // publishes the message
                }
            }()
            b.cond.L.Unlock()
        }
    
    }
    
    // Stop broadcasting event
    func (b *Broadcaster) Stop() {
        b.running = false
    }

接下来,我可以很容易地使用它:

messageToaster := func(message interface{}) {
        fmt.Printf("[New Message]: %v\n", message)
    }
    unwillingReceiver := func(message interface{}) {
        fmt.Println("Do not disturb!")
    }
    broadcaster := SetupBroadcaster()
    broadcaster.Subscribe(1, messageToaster)
    broadcaster.Subscribe(2, messageToaster)
    broadcaster.Subscribe(3, unwillingReceiver)

    go broadcaster.Start()

    broadcaster.Publish("Hello!")

    time.Sleep(time.Second)
    broadcaster.Unsubscribe(3)
    broadcaster.Publish("Goodbye!")

它应该以任意顺序打印如下内容:

[New Message]: Hello!
Do not disturb!
[New Message]: Hello!
[New Message]: Goodbye!
[New Message]: Goodbye!

go playground上查看此内容

zfciruhq

zfciruhq5#

再举一个简单例子:https://play.golang.org

type Broadcaster struct {
    mu      sync.Mutex
    clients map[int64]chan struct{}
}

func NewBroadcaster() *Broadcaster {
    return &Broadcaster{
        clients: make(map[int64]chan struct{}),
    }
}

func (b *Broadcaster) Subscribe(id int64) (<-chan struct{}, error) {
    defer b.mu.Unlock()
    b.mu.Lock()
    s := make(chan struct{}, 1)

    if _, ok := b.clients[id]; ok {
        return nil, fmt.Errorf("signal %d already exist", id)
    }

    b.clients[id] = s

    return b.clients[id], nil
}

func (b *Broadcaster) Unsubscribe(id int64) {
    defer b.mu.Unlock()
    b.mu.Lock()
    if _, ok := b.clients[id]; ok {
        close(b.clients[id])
    }

    delete(b.clients, id)
}

func (b *Broadcaster) broadcast() {
    defer b.mu.Unlock()
    b.mu.Lock()
    for k := range b.clients {
        if len(b.clients[k]) == 0 {
            b.clients[k] <- struct{}{}
        }
    }
}

type testClient struct {
    name     string
    signal   <-chan struct{}
    signalID int64
    brd      *Broadcaster
}

func (c *testClient) doWork() {
    i := 0
    for range c.signal {
        fmt.Println(c.name, "do work", i)
        if i > 2 {
            c.brd.Unsubscribe(c.signalID)
            fmt.Println(c.name, "unsubscribed")
        }
        i++
    }
    fmt.Println(c.name, "done")
}

func main() {
    var err error
    brd := NewBroadcaster()

    clients := make([]*testClient, 0)

    for i := 0; i < 3; i++ {
        c := &testClient{
            name:     fmt.Sprint("client:", i),
            signalID: time.Now().UnixNano()+int64(i), // +int64(i) for play.golang.org
            brd:      brd,
        }
        c.signal, err = brd.Subscribe(c.signalID)
        if err != nil {
            log.Fatal(err)
        }

        clients = append(clients, c)
    }

    for i := 0; i < len(clients); i++ {
        go clients[i].doWork()
    }

    for i := 0; i < 6; i++ {
        brd.broadcast()
        time.Sleep(time.Second)
    }
}

输出:

client:0 do work 0
client:2 do work 0
client:1 do work 0
client:2 do work 1
client:0 do work 1
client:1 do work 1
client:2 do work 2
client:0 do work 2
client:1 do work 2
client:2 do work 3
client:2 unsubscribed
client:2 done
client:0 do work 3
client:0 unsubscribed
client:0 done
client:1 do work 3
client:1 unsubscribed
client:1 done
agxfikkp

agxfikkp6#

因为Go语言的通道遵循通信顺序进程(CSP)模式,所以通道是一个点对点的通信实体,每次交换都有一个写入者和一个读取者。
然而,每个通道 end 都可以在多个goroutine之间 * 共享 *,这样做是安全的--不存在危险的竞态条件。
因此,可以有多个作者共享写作端。和/或可以有多个读者共享阅读端。我在不同的答案中写了更多关于这一点的内容,其中包括例子。
如果你真的需要一个广播,你不能直接这样做,但是实现一个中间的goroutine并不难,它可以把一个值复制到一组输出通道中的每一个通道。

相关问题