kafka c++客户端需要很长时间才能接收消息

nkoocmlb  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(623)

我使用的是cppkafka库,librdkafka的 Package 器,而c++kafka客户机用于一个非常简单的消息流任务。我的consumer类行为异常,因为接收消息需要相当长的时间。更准确地说,每次运行并保持运行接收可执行文件时,使用者都可以正确地接收第一批消息,但随后的消息大约需要15秒才能到达。有人知道什么样的可能性会导致这样的事情(Kafka配置、特定于库的问题或我愚蠢的错误)?万分感谢。
我的接收线如下

configuration_.set("group.id", 0);
consumer_ = std::make_unique<cppkafka::Consumer>(configuration_);
consumer_->subscribe({TopicTraits<trade::OrderRequest>::topic, TopicTraits<trade::CancelRequest>::topic});
std::thread([this] {
  while (working_) {
    cppkafka::Message msg = consumer_->poll();
    if (msg) {
      if (msg.get_error()) {
        if (!msg.is_eof()) {
          ERROR("error occurred while polling message: {}", msg.get_error());
        }
      } else {
        try {
          Json j = Json::parse(msg.get_payload());
          if (msg.get_topic() == TopicTraits<trade::OrderRequest>::topic) {
            INFO("received [order_req], {}", msg.get_payload());
            ReceiveOrderRequest(j.get<trade::OrderRequest>());
          } else if (msg.get_topic() == TopicTraits<trade::CancelRequest>::topic) {
            INFO("received [cancel_req], {}", msg.get_payload());
            ReceiveCancelRequest(j.get<trade::CancelRequest>());
          }
        } catch (const std::exception &e) {
          ERROR("error occurred while handling incoming message, {}", e.what());
        }
      }
    }
  }
}).detach();
pobjuy32

pobjuy321#

两个具有相同组id的使用者订阅了不同的主题阻止了poll()
经过一些研究,我发现这个问题与Kafka的一个更基本的配置选项有关。问题是,我的使用者在调用poll()时被阻止,其直接原因是具有相同组id的两个使用者订阅了不同的主题。我重新分配了组id,问题就消失了。

相关问题