c++ ZMQ经销商-路由器通信

piok6c0g  于 2023-06-07  发布在  其他
关注(0)|答案(2)|浏览(146)

我目前正在做一个项目,需要通过网络与分布式系统的一些实体进行不同数据类型的通信,我正在使用ZMQ。
该项目的主要目标是有一个中心节点,服务客户端可以在任何时候连接。对于连接的每个客户端,中心节点应该管理两者之间的消息通信。
目前,所有的通信都是通过TCP进行的。
客户端需要随时发送和接收消息,因此它们是ZMQ_DEALER类型的套接字,中心节点是ZMQ_ROUTER
最初,目标是来自某个客户端的一条消息到达其他客户端。这意味着其他客户端可以看到所有相同的数据。
我之所以使用异步客户机/服务器模式,是因为我对多个客户机以协作方式相互通信感兴趣,可能需要一个服务器代理或中间件。
我有一个ZMQ_DEALER套接字客户端连接到ZMQ_ROUTER套接字服务器

#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;

int main(int argc, char *argv[])
{

    zmq::context_t context(1);
    zmq::socket_t client(context, ZMQ_DEALER);

    const string endpoint = "tcp://localhost:5559";

    client.setsockopt(ZMQ_IDENTITY, "PEER1", 5);
    cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
    client.connect(endpoint);
    for (int request = 0; request < 10; request++)
    {

        s_sendmore(client, "");
        s_send(client, "Testing sending some data");

        std::string string = s_recv(client);

        std::cout << "Received reply " << request
                  << " [" << string << "]" << std::endl;
    }
}

在我的服务器代码中,我有一个ZMQ_ROUTER,它接收和管理消息,并将其绑定到一个良好的端口。这个服务器是用Python做的

import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

# Initialize a poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)

print("Creating Server Network Manager Router")

while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        print(message)
        frontend.send_multipart(message)

在我的其他同行/客户端上,我有以下内容:

#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;

int main (int argc, char *argv[])
{

    zmq::context_t context(1);
    zmq::socket_t peer2(context, ZMQ_DEALER);

    const string endpoint = "tcp://localhost:5559";

    peer2.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
    cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
    peer2.connect(endpoint);
    //s_sendmore(peer2, "");
    //s_send(peer2, "Probando");

    //std::string string = s_recv(peer2);

    //std::cout << "Received reply " << " [" << string << "]" << std::endl;

    for (int request = 0; request < 10; request++)
    {

        s_sendmore(peer2, "");
        s_send(peer2, "Probando");

        std::string string = s_recv(peer2);

        std::cout << "Received reply " << request
                  << " [" << string << "]" << std::endl;
    }

}

更新

但是我执行的每一个客户端,它们各自的消息都没有到达另一个对等客户端。消息到达ZMQ_ROUTER,并返回到ZMQ_DEALER发送源。

这是因为在接收时标识帧在ROUTER之前,并且消息通过ROUTER发送回来;在这里,它删除标识并使用该值将消息路由回相关的DEALER(according to the ZMQ_ROUTER section to the end page here)。
这是逻辑,我将我的DEALER的标识发送到ROUTERROUTER获取该标识帧并将消息返回给我的DEALER
在第一个示例中,在我的实现中开始,我需要任何DEALER发送的一些消息,这将被任何其他DEALER可视化,而不管有多少DEALER(一个或多个)连接到ZMQ_ROUTER。在这个意义上…是否有必要与其他经销商或其他经销商的身份框架进行沟通?
如果我有DEALER ADEALER B,和DEALER C,和ROUTER
然后:
DEALER A发送消息...我希望来自经销商A的消息到达DEALER BDEALER C以及其他可以加入我的会话对话的DEALERS...
在这个想法的顺序,是必要的满足身份框架的DEALER BDEALER C先前在DEALER A侧,使这个消息到达他?
如何知道我的实现中存在的每个DEALER的标识框架?这是在路由器端进行的吗?我还没弄清楚

ej83mcc0

ej83mcc01#

您可以让所有客户端在启动时发送“我在这里”消息。然后,中央服务器可以存储所有ID,参见图1。worker和路由器之间的初始通信如下:服务https://zguide.zeromq.org/docs/chapter3/#A-Load-Balancing-Message-Broker器将向所有当前已知的客户端发出任何接收到的消息。您应该添加一些心跳,以便检测断开连接的客户端,参见。https://zguide.zeromq.org/docs/chapter4/#Heartbeating。
然而,ZeroMQ已经带有这样的通信模式:PUB - SUB。本质上,每个客户端都有一个DEALER和一个SUB套接字连接到服务器ROUTERPUB套接字。服务器只是通过PUB套接字将任何收到的消息发送给 * 所有 * 客户端。如果这对始发客户端来说是个问题,您可以在消息中包含客户端ID,这样每个客户端都可以用自己的ID过滤掉消息。另请参阅指南www.example.com中的此示例https://zguide.zeromq.org/docs/chapter5/#Getting-an-Out-of-Band-Snapshot
另一个有趣的模式是从客户端重新发布更新:

这里使用PUSH--PULL将更新发送到服务器。如果不需要来自服务器的回复消息,这是有意义的。如果您不需要该示例中的状态请求,则可以省略ROUTER--DEALER部分。这里是一个使用Python的简单实现。服务器监听PULL套接字,并通过PUB套接字发送所有内容:

import zmq

def main():
    # context and sockets
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    while True:
        message = collector.recv()
        print "I: publishing update %s" % message
        publisher.send(message)

if __name__ == '__main__':
    main()

客户端监听PUB套接字一段时间。如果接收到消息,则将其记录。如果达到超时时间,则生成一条概率为1/10的消息:

import random
import time

import zmq

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    subscriber = ctx.socket(zmq.SUB)
    subscriber.setsockopt(zmq.SUBSCRIBE, '')
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    while True:
        if subscriber.poll(100) & zmq.POLLIN:
            message = subscriber.recv()
            print "I: received message %s" % message
        else:
            rand = random.randint(1, 100)
            if rand < 10:
                publisher.send("%d" % rand)
                print "I: sending message %d" % rand

if __name__ == '__main__':
    main()
ar7v8xwq

ar7v8xwq2#

(过早) 奖励的答案不符合定义的属性。

分布式系统需要同时智能高效地运行,因为代理是分布式的,错误分析和部署的生产问题的分析/测试/调试都非常昂贵。
因此,复制/粘贴重复使用一个问题不兼容的想法不是实现前者的方法,后者就更少了。

那么,我们先来回顾一下效率:

客户端-[A].send()-s消息,O/P希望成为服务器端-[S].recv()-ed,并重新广播给所有 * 其他 * 客户端-[B,C,...],除了[A]-本身。
最节省资源的方法是正确地配置基础设施工具,而不需要重新发明轮子和/或使用脆弱和性能破坏性的脚手架代码。
所以:
在客户端-[*]侧,最好使用下面概述的基本代理概念。更复杂的设置,例如使用Tkinter已经发展到包含在**.mainloop()**软实时系统中的聪明的事件处理设施,更好,但在多个方面开始设计之争并不容易,所以让我们在此刻保持简单:

zmq_VERSION      = zmq.zmq_version_info()
anAgentsIDENTITY = whateverHashOrHumanReadableSTRING
notMINE          = anAgentsIDENTITY

if     zmq_VERSION[0] < 4:
           print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )
aCTX = zmq.Context( 2 )                        # if performance boosting is needed

#SUB ---------------------------------------------------------------------------
aSUB = aCTX.socket( zmq.SUB )
aSUB.setsockopt(    zmq.LINGER,          0 )   # protect your agent
aSUB.setsockopt(    zmq.MAXMSGSIZE,      m )   # protect your agent from DoS
aSUB.setsockopt(    zmq.AFFINITY,        1 )   # protect your server resources
aSUB.setsockopt(    zmq.HEARTBEAT_IVL,   ivl ) #     set server helping Heartbeats
aSUB.setsockopt(    zmq.HEARTBEAT_TTL,   ttl ) #     set server helping Heartbeats
aSUB.setsockopt(    zmq.INVERT_MATCHING, 1 )   #   avoid server sending data back
aSUB.setsockopt(    zmq.SUBSCRIBE,       notMINE )  #  NEVER .recv()-s  data back
...
#SUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST

aSUB.connect(      "tcp://localhost:5557" )

#PUSH --------------------------------------------------------------------------
aPUSH = aCTX.socket( zmq.PUSH )
...
#PUSH PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST

#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True; anAgentSignsWithIdentityPREFIX = anAgentsIDENTITY
while notSoftFLAG:

    if aReasonToSendSomethingToServer:
       aPUSH.send( anAgentSignsWithIdentityPREFIX
                 + ":::"
                 + aMsgPAYLOAD,
                   zmq.DONTWAIT
                   )                          # inspect ZMQError
       ...
       pass

    if aSUB.poll( 100 ):
       message = aSUB.recv( zmq.DONTWAIT )    #  NEVER .recv()-s own data back
       ...
       pass

    if aReasonToFlagLoopEXIT:
       notSoftFLAG = False
       ...
       pass

    if ...:
       ...
       pass

#main loop ---------------------------------------------------------------------
pass

#########
# ALWAYS:
#          better using context-aware try:/except:/finally:

aRetCODE = [ aSOCK.close() for aSOCK in ( aSUB, aPUSH, ) ]
...

aCTX.term()
#   .term()
#########

服务器端可以避免 ALL 麻烦,无需任何特殊处理:

所有这些都在ZeroMQ基础设施中得到了很好的调整:

pass;  zmq_VERSION = zmq.zmq_version_info()
if     zmq_VERSION[0] < 4:
           print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )

aCTX = zmq.Context( 2 )                        # if performance boosting is needed

#PUB ---------------------------------------------------------------------------
aPUB = aCTX.socket( zmq.PUB )
aPUB.setsockopt(    zmq.LINGER,          0 )   # protect your server
aPUB.setsockopt(    zmq.MAXMSGSIZE,      m )   # protect your server from DoS
aPUB.setsockopt(    zmq.AFFINITY,        3 )   # protect your server resources
aPUB.setsockopt(    zmq.HEARTBEAT_IVL,   ivl ) #     server L3-helper Heartbeats
aPUB.setsockopt(    zmq.HEARTBEAT_TTL,   ttl ) #     server L3-helper Heartbeats
aPUB.setsockopt(    zmq.INVERT_MATCHING, 1 )   #   avoid server sending data back
aPUB.setsockopt(    zmq.IMMEDIATE,       1 )   # avoid Queueing for dead-ends
aPUB.setsockopt(    zmq.TOS,             tos ) # allow for L3-router TOS-policies
...
#PUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPUB.bind(   "tcp://*:5557" )                  # expose AccessPoint on tcp://

#PULL --------------------------------------------------------------------------
aPULL = aCTX.socket( zmq.PULL )
aPULL.setsockopt(    zmq.LINGER,          0 )  # protect your server
aPULL.setsockopt(    zmq.MAXMSGSIZE,      m )  # protect your server from DoS
aPULL.setsockopt(    zmq.AFFINITY,        3 )  # protect your server resources
aPULL.setsockopt(    zmq.HEARTBEAT_IVL,   ivl )#     server L3-helper Heartbeats
aPULL.setsockopt(    zmq.HEARTBEAT_TTL,   ttl )#     server L3-helper Heartbeats
...
#PULL PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPULL.bind(   "tcp://*:5558" )                 # expose AccessPoint on tcp://
...

#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True
while notSoftFLAG:
    NOP_SLEEP = 10                            #  set a 10 [ms] sleep in case NOP
    if aPULL.poll( 0 ):                       #  NEVER block/wait
       aMSG = aPULL.recv( zmq.DONTWAIT )      #  NEVER .recv()-s own data back
       #CPY = zmq_msg_copy( &aMSG );          // WARNING ABOUT NATIVE C-API
       #                                      // HANDLING, NEED .COPY()
       #                                      //           NEED .CLOSE()
       aPUB.send( aMSG,   zmq.DONTWAIT )      #  re-PUB-lish to all others but sender
       ...< process aMSG payload on server-side, if needed >...

       NOP_SLEEP = 0                          # !NOP, avoid 10[ms] NOP-loop sleep
       pass

    if aReasonToFlagLoopEXIT:
       notSoftFLAG = False
       ...
       NOP_SLEEP = 0
       pass

    if ...:
       ...
       pass

    sleep( NOP_SLEEP )                        # a soft-real-time controlled sleep on NOP
#main loop ---------------------------------------------------------------------
pass

#########
# ALWAYS:
#          better using context-aware try:/except:/finally:

aRetCODE = [ aSOCK.close() for aSOCK in ( aPUB, aPULL, ) ]
...

aCTX.term()
#   .term()
#########

相关问题