我目前正在做一个项目,需要通过网络与分布式系统的一些实体进行不同数据类型的通信,我正在使用ZMQ。
该项目的主要目标是有一个中心节点,服务客户端可以在任何时候连接。对于连接的每个客户端,中心节点应该管理两者之间的消息通信。
目前,所有的通信都是通过TCP进行的。
客户端需要随时发送和接收消息,因此它们是ZMQ_DEALER
类型的套接字,中心节点是ZMQ_ROUTER
。
最初,目标是来自某个客户端的一条消息到达其他客户端。这意味着其他客户端可以看到所有相同的数据。
我之所以使用异步客户机/服务器模式,是因为我对多个客户机以协作方式相互通信感兴趣,可能需要一个服务器代理或中间件。
我有一个ZMQ_DEALER
套接字客户端连接到ZMQ_ROUTER
套接字服务器
#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;
int main(int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t client(context, ZMQ_DEALER);
const string endpoint = "tcp://localhost:5559";
client.setsockopt(ZMQ_IDENTITY, "PEER1", 5);
cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
client.connect(endpoint);
for (int request = 0; request < 10; request++)
{
s_sendmore(client, "");
s_send(client, "Testing sending some data");
std::string string = s_recv(client);
std::cout << "Received reply " << request
<< " [" << string << "]" << std::endl;
}
}
在我的服务器代码中,我有一个ZMQ_ROUTER,它接收和管理消息,并将其绑定到一个良好的端口。这个服务器是用Python做的
import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Initialize a poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
print("Creating Server Network Manager Router")
while True:
socks = dict(poller.poll())
if socks.get(frontend) == zmq.POLLIN:
message = frontend.recv_multipart()
print(message)
frontend.send_multipart(message)
在我的其他同行/客户端上,我有以下内容:
#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;
int main (int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t peer2(context, ZMQ_DEALER);
const string endpoint = "tcp://localhost:5559";
peer2.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
peer2.connect(endpoint);
//s_sendmore(peer2, "");
//s_send(peer2, "Probando");
//std::string string = s_recv(peer2);
//std::cout << "Received reply " << " [" << string << "]" << std::endl;
for (int request = 0; request < 10; request++)
{
s_sendmore(peer2, "");
s_send(peer2, "Probando");
std::string string = s_recv(peer2);
std::cout << "Received reply " << request
<< " [" << string << "]" << std::endl;
}
}
更新
但是我执行的每一个客户端,它们各自的消息都没有到达另一个对等客户端。消息到达ZMQ_ROUTER
,并返回到ZMQ_DEALER
发送源。
这是因为在接收时标识帧在ROUTER之前,并且消息通过ROUTER发送回来;在这里,它删除标识并使用该值将消息路由回相关的DEALER(according to the ZMQ_ROUTER section to the end page here)。
这是逻辑,我将我的DEALER
的标识发送到ROUTER
,ROUTER
获取该标识帧并将消息返回给我的DEALER
在第一个示例中,在我的实现中开始,我需要任何DEALER发送的一些消息,这将被任何其他DEALER可视化,而不管有多少DEALER(一个或多个)连接到ZMQ_ROUTER。在这个意义上…是否有必要与其他经销商或其他经销商的身份框架进行沟通?
如果我有DEALER A
,DEALER B
,和DEALER C
,和ROUTER
然后:DEALER A
发送消息...我希望来自经销商A的消息到达DEALER B
和DEALER C
以及其他可以加入我的会话对话的DEALERS
...
在这个想法的顺序,是必要的满足身份框架的DEALER B
和DEALER C
先前在DEALER A
侧,使这个消息到达他?
如何知道我的实现中存在的每个DEALER的标识框架?这是在路由器端进行的吗?我还没弄清楚
2条答案
按热度按时间ej83mcc01#
您可以让所有客户端在启动时发送“我在这里”消息。然后,中央服务器可以存储所有ID,参见图1。worker和路由器之间的初始通信如下:服务https://zguide.zeromq.org/docs/chapter3/#A-Load-Balancing-Message-Broker器将向所有当前已知的客户端发出任何接收到的消息。您应该添加一些心跳,以便检测断开连接的客户端,参见。https://zguide.zeromq.org/docs/chapter4/#Heartbeating。
然而,ZeroMQ已经带有这样的通信模式:
PUB
-SUB
。本质上,每个客户端都有一个DEALER
和一个SUB
套接字连接到服务器ROUTER
和PUB
套接字。服务器只是通过PUB
套接字将任何收到的消息发送给 * 所有 * 客户端。如果这对始发客户端来说是个问题,您可以在消息中包含客户端ID,这样每个客户端都可以用自己的ID过滤掉消息。另请参阅指南www.example.com中的此示例https://zguide.zeromq.org/docs/chapter5/#Getting-an-Out-of-Band-Snapshot另一个有趣的模式是从客户端重新发布更新:
这里使用
PUSH
--PULL
将更新发送到服务器。如果不需要来自服务器的回复消息,这是有意义的。如果您不需要该示例中的状态请求,则可以省略ROUTER
--DEALER
部分。这里是一个使用Python的简单实现。服务器监听PULL
套接字,并通过PUB
套接字发送所有内容:客户端监听
PUB
套接字一段时间。如果接收到消息,则将其记录。如果达到超时时间,则生成一条概率为1/10的消息:ar7v8xwq2#
(过早) 奖励的答案不符合定义的属性。
分布式系统需要同时智能和高效地运行,因为代理是分布式的,错误分析和部署的生产问题的分析/测试/调试都非常昂贵。
因此,复制/粘贴重复使用一个问题不兼容的想法不是实现前者的方法,后者就更少了。
那么,我们先来回顾一下效率:
客户端-
[A].send()
-s消息,O/P希望成为服务器端-[S].recv()
-ed,并重新广播给所有 * 其他 * 客户端-[B,C,...]
,除了[A]
-本身。最节省资源的方法是正确地配置基础设施工具,而不需要重新发明轮子和/或使用脆弱和性能破坏性的脚手架代码。
所以:
在客户端-
[*]
侧,最好使用下面概述的基本代理概念。更复杂的设置,例如使用Tkinter已经发展到包含在**.mainloop()
**软实时系统中的聪明的事件处理设施,更好,但在多个方面开始设计之争并不容易,所以让我们在此刻保持简单:服务器端可以避免 ALL 麻烦,无需任何特殊处理:
所有这些都在ZeroMQ基础设施中得到了很好的调整: