我是新去,我试图创建一个简单的聊天服务器,客户端可以广播消息到所有连接的客户端。
在我的服务器中,我有一个接受连接的goroutine(无限for循环),所有的连接都由一个通道接收。
go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()
然后,我为每一个连接的客户端启动一个处理程序(goroutine),在处理程序中,我尝试通过迭代通道向所有的连接进行广播。
for c := range ch {
conn.Write(msg)
}
但是,我不能广播,因为(我从阅读文档中想到)在迭代之前需要关闭通道。我不确定何时应该关闭通道,因为我想不断地接受新的连接,而关闭通道不允许我这样做。如果有人能帮助我,或者提供一种更好的方法来向所有连接的客户端广播消息,我将不胜感激。
6条答案
按热度按时间bgibtngc1#
你所做的是一个扇出模式,也就是说,多个端点监听一个输入源。这种模式的结果是,只要输入源中有消息,这些监听器中只有一个能够得到消息。唯一的例外是通道的
close
。这个close
将被所有的监听器识别,因此是一个“广播”。但你要做的是广播从连接读取的消息,所以我们可以这样做:
当监听程序的数量已知时
让每个工作人员收听专用广播信道,并将消息从主信道调度到每个专用广播信道。
然后我们可以有一群工人:
然后开始我们的监听:
还有一个调度员:
当监听程序的数量未知时
在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,每当你需要一个新的worker时,你需要创建一个新的worker,启动它,然后把它推到
workers
切片中。但是这个方法需要一个线程安全的切片,它需要一个锁。其中一个实现可能如下所示:每当您要启动Worker时:
您的调度程序将更改为:
最后一句话:永远不要留下一个悬空的goroutine
其中一个良好做法是:永远不要留下一个悬空的goroutine,所以当你听完之后,你需要关闭所有的goroutine,这将通过
worker
中的quit
通道来完成:首先,我们需要创建一个全局
quit
信令通道:每当我们创建一个worker时,我们就将
globalQuit
通道分配给它作为它的退出信号:然后,当我们想要关闭所有工作线程时,只需执行以下操作:
由于
close
会被所有侦听的goroutine识别(这一点你已经理解了),所以所有的goroutine都会被返回。记住关闭你的调度器例程,但我会把它留给你:)zaq34kh62#
一个更优雅的解决方案是“代理”,客户端可以在其中订阅和取消订阅消息。
为了优雅地处理订阅和取消订阅,我们可以利用通道,这样接收和分发消息的代理的主循环可以使用单个
select
语句合并所有这些消息,并且同步是根据解决方案的性质给出的。另一个技巧是将订阅者存储在一个Map中,从我们用来向他们分发消息的通道Map。因此,使用通道作为Map中的键,然后添加和删除客户端是“死”简单的。这是可能的,因为通道值是可比较的,它们的比较是非常有效的,因为通道值是指向通道描述符的简单指针。
下面是一个简单的代理实现:
使用示例:
上面的输出将是(在Go Playground上试试):
改进
您可以考虑以下改进。这些改进可能有用,也可能不有用,这取决于您如何/如何使用代理。
Broker.Unsubscribe()
可以关闭消息通道,用信号通知不再在其上发送消息:这将允许客户端通过消息通道进行
range
,如下所示:然后,如果有人取消订阅此
msgCh
,如下所示:在处理完调用
Unsubscribe()
之前发送的所有消息后,上述range循环将终止。如果您希望客户端依赖于正在关闭的消息通道,并且代理的生存期比应用的生存期短,那么您还可以在停止代理时关闭所有订阅的客户端,方法如下:
qpgpyjmq3#
广播到一个通道切片并使用同步。互斥锁来管理通道添加和删除可能是最简单的方法在您的情况下。
下面是您可以在golang中对
broadcast
执行的操作:but5z9lq4#
这是一个迟来的答案,但我认为它可能会安抚一些好奇的读者。
Go通道在并发性方面受到广泛欢迎。
围棋界就死板地遵循着这样的说法:
不通过共享内存进行通信;而是通过通信来共享存储器。
我对此完全中立,我认为在广播方面应该考虑其他选项,而不是定义明确的
channels
。以下是我的看法:来自同步包的条件是widely overlooked。在非常相同的上下文中实现青铜人建议的braodcaster值得注意。
我很高兴听到icza建议使用频道并在频道上广播消息。我遵循同样的方法并使用sync的条件变量:
这是我们整个广播理念所依赖的主要结构。
下面,我为这个结构定义了一些行为。简单地说,订阅者应该能够被添加,删除,整个过程应该是可撤销的。
接下来,我可以很容易地使用它:
它应该以任意顺序打印如下内容:
在go playground上查看此内容
zfciruhq5#
再举一个简单例子:https://play.golang.org
输出:
agxfikkp6#
因为Go语言的通道遵循通信顺序进程(CSP)模式,所以通道是一个点对点的通信实体,每次交换都有一个写入者和一个读取者。
然而,每个通道 end 都可以在多个goroutine之间 * 共享 *,这样做是安全的--不存在危险的竞态条件。
因此,可以有多个作者共享写作端。和/或可以有多个读者共享阅读端。我在不同的答案中写了更多关于这一点的内容,其中包括例子。
如果你真的需要一个广播,你不能直接这样做,但是实现一个中间的goroutine并不难,它可以把一个值复制到一组输出通道中的每一个通道。