c++ zeromq pub sub上丢失的消息

bqf10yzr  于 2023-04-01  发布在  其他
关注(0)|答案(4)|浏览(185)

我正在尝试使用zeromq框架实现pub sub设计模式。这个想法是启动一个订阅者,然后启动发布者。订阅者将侦听100条消息,发布者将发布100条消息。到目前为止一切顺利......然而实际情况是,即使订阅者在发布者启动时已经启动并运行,不是所有的消息都被订阅者接收到(如果发布者将发送至少500条消息,则订阅者将拾取100条消息)。
有什么想法吗
先谢谢你了,奥默。

订阅者代码(在发布者之前启动)

int i=0;
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);

for (int update_nbr = 0; update_nbr < 100; update_nbr++) 
{        
    zmq::message_t update;
    subscriber.recv(&update);
    i++;
    std::cout<<"receiving  :"<<i<<std::endl;
}

发布者代码(在订阅者之后启动)

zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");

int i = 0;
for (int update_nbr = 0; update_nbr < 100; update_nbr++) 
{        
    //  Send message to all subscribers
    zmq::message_t request (20);

    time_t seconds;
    seconds = time (NULL);

    char update [20]="";
    sprintf (update, "%ld", seconds);

    memcpy ((void *) request.data (), update,strlen(update));
    publisher.send(request);
    i++;
    std::cout << "sending :" << i << std::endl;

}

ttcibm8c

ttcibm8c1#

参见https://zguide.zeromq.org/docs/chapter2/#Missing-Message-Problem-Solver(图25中的流程图和下面的解释)
基本上,建立连接需要一点时间(几毫秒),在这段时间里,大量的消息可能会丢失。发布者需要在开始发布之前休息一段时间,或者(更好)它需要显式地与订阅者同步。

nafvub8i

nafvub8i2#

请看指南。
1.发布者发送“您好”
1.每个接收到“hello”的订阅者通过REQ/REP套接字向发布者发送消息
1.当发布者得到足够的REQ/REP消息时,它开始发布数据

kx7yvsdv

kx7yvsdv3#

在0MQ中,成功的send()并不意味着数据立即通过网络发送。http://api.zeromq.org/2-1:zmq-send。您的消息非常小,AFAIR 0MQ为小消息做了某种缓冲,以更有效地使用网络。
如果我没记错的话,0MQ的config.hpp中的out_batch_size控制着这样的行为。

smtd7mpg

smtd7mpg4#

有一件事要看(超出了前面的评论者所指出的)是你的关机过程。
代码片段可能只是不完整,但我看不出你是如何处理关机的。特别是你可能实际上丢失了发送的 * 最后 * 条消息。请查看zmq_closezmq_termZMQ_LINGER的文档。如果你 * 不是 * 实际调用这些函数,而是简单地终止应用程序,那么有可能已经用**zmq_send()**发送但还没有传输到网络的消息在关机时丢失。
要检查丢失了哪些消息,除了时间戳之外,您还可以尝试发送序列号。

相关问题