在c++visual studio 2019的librdkafka库中使用rd\u kafka\u conf\u set()查询创建2个代理服务器

gmxoilav  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(198)

在教程和示例代码之后,我使用visual studio 2019、apache kafka版本:kafka 2.13-2.6.0、librdkafka redist版本:1.5.2编写了一个程序来创建两个kafka使用者,每个使用者都订阅了c++中的不同主题。我在本地计算机上创建了一个kafka环境,步骤如下https://kafka.apache.org/quickstart.
我的程序如下。main.cpp有3个函数:->configkafkaconsumer()->subscribetokafkatopics(rd\u kafka\u trk,rd\u kafka\u topic\u partition\u list\u ttopics)->pollformsgs(rd\u kafka\u trk1,rd\u kafka\u trk2)
将使用者、生产者、服务器和zookeeper的所有配置设置保持在安装时的状态,通过将属性“bootstrap.servers”设置为本地主机,我在代码中创建了一个代理localhost:9092".
所以,我知道只有一个代理,一个分区(在默认配置文件中定义)。
测试应用程序的步骤:
我启动了zookeeper,然后是本地机器上的服务器。
我用.bat文件在我的windows终端上创建了两个主题(主题名称:第一主题1,第二主题1)
然后我创建了两个生产者发布上述两个主题创建。
接下来我启动了我的应用程序(运行了exe)
我从我的每个生产者发送了示例消息,并且能够看到它们出现在我的应用程序(exe窗口)上。
测试成功了。
现在,我想扩展连接到2个代理的功能。我修改了我的程序,将csv字符串声明为state 2 brokers,我提供了字符串:“localhost:9092,52.111.222.217:9092“在我的应用程序的引导服务器属性中。
在我的应用程序中做了这个更改之后,我再次遵循上面的“测试应用程序的步骤”。这一次,我无法从任何一个本地主机生产者收到任何消息。
我的假设是,即使我创建了两个代理,并将本地计算机用作一个代理,但至少应该显示来自代理的消息,而不考虑第二个代理(52.111.222.217:9092)。
但是,我在应用程序的控制台中没有看到任何消息。
任何指导,帮助我配置我的应用程序与2经纪人将高度赞赏,以及任何材料/教程参考也将是非常有帮助的。


# include <string>

# include <iostream>

# include <vector>

# include "..\include\librdkafka\rdkafkacpp.h"

# include "..\include\librdkafka\rdkafka.h"

# define ENABL_DBG_VERBOSE

using namespace std;
using namespace RdKafka;

# ifdef ENABL_DBG_VERBOSE

# define DBG(x) (x)

# else

# define DBG(x) do{}while(0)

# endif

void pollForMsgs(rd_kafka_t * rk1, rd_kafka_t * rk2)
{
    rd_kafka_resp_err_t err, err2;
    string inputKafkaMsg, inputKafkaMsg2;

    DBG(cout << "Inside pollForMsgs()" << endl);

    // The while(1) loop
    while (1) //running?
    {
        rd_kafka_message_t* rkmessage = rd_kafka_consumer_poll(rk1, 500);
        rd_kafka_message_t* rkmessage2 = rd_kafka_consumer_poll(rk2, 500);
        DBG(cout << "Inside while()" << endl);

        if (rkmessage)
        {
            DBG(cout << "Dbg: Line 691" << endl);
            if ((rkmessage == NULL))
            {
                DBG(cout << "Dbg: Line 694" << endl);
                continue;
            }

            if ((rkmessage->err))
            {
                if ((rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF))
                {
                    DBG(cout << "Dbg: Line 702" << endl);
                    continue;
                }

                if ((rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
                    rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC))
                {
                    DBG(cout << "Dbg: Line 711" << endl);
                    continue;
                }
                continue;
            }
            DBG(cout << "Dbg: Line 716" << endl);
            inputKafkaMsg = inputKafkaMsg + (char*)rkmessage->payload;

            rd_kafka_message_destroy(rkmessage);
        }
        else
        {
            DBG(cout << "Dbg: Line 728" << endl);
            DBG(cout << "inputKafkaMsg: " << inputKafkaMsg << endl);
            inputKafkaMsg = "";
        }

        // Second consumer message poller
        if (rkmessage2)
        {
            if ((rkmessage2 == NULL))
            {
                continue;
            }

            if ((rkmessage2->err))
            {
                if ((rkmessage2->err == RD_KAFKA_RESP_ERR__PARTITION_EOF))
                {
                    DBG(cout << "Dbg: Line 709" << endl);
                    continue;
                }

                if ((rkmessage2->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
                    rkmessage2->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC))
                {
                    DBG(cout << "Dbg: Line 718" << endl);
                    continue;
                }

                continue;
            }
            DBG(cout << "Dbg: Line 768" << endl);
            inputKafkaMsg2 = inputKafkaMsg2 + (char*)rkmessage2->payload;

            rd_kafka_message_destroy(rkmessage2);
        }
        else
        {
            DBG(cout << "Dbg: Line 779" << endl);
            DBG(cout << "inputKafkaMsg2: " << inputKafkaMsg2 << endl);

            inputKafkaMsg2 = "";
        }
    }

    err = rd_kafka_consumer_close(rk1);
    err2 = rd_kafka_consumer_close(rk2);

    if (err || err2)
    {
        fprintf(stderr, "%% Failed to close consumer(s): %s\n", rd_kafka_err2str(err));
    }
    else
    {
        fprintf(stderr, "%% Consumer(s) closed\n");
    }
}

void subscribeToKafkaTopics(rd_kafka_t * rk, rd_kafka_topic_partition_list_t * topics)
{
    rd_kafka_resp_err_t err;

    DBG(cout << "Inside subscribeToKafkaTopics()" << endl);

    if ((err = rd_kafka_subscribe(rk, topics)))
    {
        fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
        exit(1);
    }
}

int configKafkaConsumer()
{
    char hostname[128] = "localhost:9092";

    char errstr[512] = "";
    string kafkaBrokerServer = "localhost:9092";
    string kafkaBrokerServerList = "localhost:9092,52.111.222.217:9092";

    vector<string> firstListtopics;
    vector<string> secondListtopics;

    rd_kafka_topic_partition_list_t* list_firstListtopics = NULL;
    rd_kafka_topic_partition_list_t* list_secondListtopics = NULL;

    // Consumer Configurations
    rd_kafka_conf_t* config[2] = { NULL, NULL };

    // Kafka handles configured (as consumers)
    rd_kafka_t* rk[2] = { NULL, NULL };

    unsigned int i = 0;

    DBG(cout << "Inside config_kafka_consumer()" << endl);

    firstListtopics.push_back("firsttopic1");
    secondListtopics.push_back("secondtopic1");

    // Add topic to the first topic list
    list_firstListtopics = rd_kafka_topic_partition_list_new((int)firstListtopics.size());

    for (unsigned int i = 0; i < firstListtopics.size(); i++)
    {
        rd_kafka_topic_partition_list_add(list_firstListtopics, firstListtopics[i].c_str(),
            RD_KAFKA_PARTITION_UA);
    }

    // Add topic to the second topic list
    list_secondListtopics = rd_kafka_topic_partition_list_new((int)secondListtopics.size());

    for (unsigned int i = 0; i < secondListtopics.size(); i++)
    {
        rd_kafka_topic_partition_list_add(list_secondListtopics, secondListtopics[i].c_str(),
            RD_KAFKA_PARTITION_UA);
    }

    for (i = 0; i < 2; i++)
    {
        config[i] = rd_kafka_conf_new();

        // Configure the conf properties 'client.id' to identify the consumer here
        if (rd_kafka_conf_set(config[i], "client.id", hostname,
            errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
        {
            fprintf(stderr, "%% %s\n", errstr);
            DBG(cout << "client.id:" << endl);
            exit(1);
        }

        // Configure the conf properties 'group.id' to identify the consumer here
        if (rd_kafka_conf_set(config[i], "group.id", "foo",
            errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
        {
            fprintf(stderr, "%% %s\n", errstr);
            DBG(cout << "group.id:" << endl);
            exit(1);
        }

        // "localhost:9092,52.111.222.217:9092"
        // Configure the broker list here "host1:9092 host2:etc ..."
/***THE BELOW IS NOT WORKING, WHEREAS JUST "localhost:9092 works FINE*****/
        if (rd_kafka_conf_set(config[i], "bootstrap.servers", "localhost:9092,52.111.222.217:9092",
            errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
        {
            fprintf(stderr, "%% %s\n", errstr);
            DBG(cout << "bootstrap.servers" << endl);
            exit(1);
        }

        // Create consumer handles
        if (!(rk[i] = rd_kafka_new(RD_KAFKA_CONSUMER, config[i],
            errstr, sizeof(errstr))))
        {
            fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
            exit(1);
        }
    }
    subscribeToKafkaTopics(rk[0], list_firstListtopics);
    subscribeToKafkaTopics(rk[1], list_secondListtopics);

    pollForMsgs(rk[0], rk[1]);

    // destroy lists
}

void main()
{
    configKafkaConsumer();
    while (1);
}

要重现相同的问题:只需按标题“测试应用程序的步骤”下的上述步骤运行本地kafka服务器localhost:9092“和”localhost:9092,52.111.222.217:9092". 在前一种情况下,生产者发送的任何消息都会显示在消费者端;而在后者的情况下,没有消息会出现在消费者方面。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题