在教程和示例代码之后,我使用visual studio 2019、apache kafka版本:kafka 2.13-2.6.0、librdkafka redist版本:1.5.2编写了一个程序来创建两个kafka使用者,每个使用者都订阅了c++中的不同主题。我在本地计算机上创建了一个kafka环境,步骤如下https://kafka.apache.org/quickstart.
我的程序如下。main.cpp有3个函数:->configkafkaconsumer()->subscribetokafkatopics(rd\u kafka\u trk,rd\u kafka\u topic\u partition\u list\u ttopics)->pollformsgs(rd\u kafka\u trk1,rd\u kafka\u trk2)
将使用者、生产者、服务器和zookeeper的所有配置设置保持在安装时的状态,通过将属性“bootstrap.servers”设置为本地主机,我在代码中创建了一个代理localhost:9092".
所以,我知道只有一个代理,一个分区(在默认配置文件中定义)。
测试应用程序的步骤:
我启动了zookeeper,然后是本地机器上的服务器。
我用.bat文件在我的windows终端上创建了两个主题(主题名称:第一主题1,第二主题1)
然后我创建了两个生产者发布上述两个主题创建。
接下来我启动了我的应用程序(运行了exe)
我从我的每个生产者发送了示例消息,并且能够看到它们出现在我的应用程序(exe窗口)上。
测试成功了。
现在,我想扩展连接到2个代理的功能。我修改了我的程序,将csv字符串声明为state 2 brokers,我提供了字符串:“localhost:9092,52.111.222.217:9092“在我的应用程序的引导服务器属性中。
在我的应用程序中做了这个更改之后,我再次遵循上面的“测试应用程序的步骤”。这一次,我无法从任何一个本地主机生产者收到任何消息。
我的假设是,即使我创建了两个代理,并将本地计算机用作一个代理,但至少应该显示来自代理的消息,而不考虑第二个代理(52.111.222.217:9092)。
但是,我在应用程序的控制台中没有看到任何消息。
任何指导,帮助我配置我的应用程序与2经纪人将高度赞赏,以及任何材料/教程参考也将是非常有帮助的。
# include <string>
# include <iostream>
# include <vector>
# include "..\include\librdkafka\rdkafkacpp.h"
# include "..\include\librdkafka\rdkafka.h"
# define ENABL_DBG_VERBOSE
using namespace std;
using namespace RdKafka;
# ifdef ENABL_DBG_VERBOSE
# define DBG(x) (x)
# else
# define DBG(x) do{}while(0)
# endif
void pollForMsgs(rd_kafka_t * rk1, rd_kafka_t * rk2)
{
rd_kafka_resp_err_t err, err2;
string inputKafkaMsg, inputKafkaMsg2;
DBG(cout << "Inside pollForMsgs()" << endl);
// The while(1) loop
while (1) //running?
{
rd_kafka_message_t* rkmessage = rd_kafka_consumer_poll(rk1, 500);
rd_kafka_message_t* rkmessage2 = rd_kafka_consumer_poll(rk2, 500);
DBG(cout << "Inside while()" << endl);
if (rkmessage)
{
DBG(cout << "Dbg: Line 691" << endl);
if ((rkmessage == NULL))
{
DBG(cout << "Dbg: Line 694" << endl);
continue;
}
if ((rkmessage->err))
{
if ((rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF))
{
DBG(cout << "Dbg: Line 702" << endl);
continue;
}
if ((rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC))
{
DBG(cout << "Dbg: Line 711" << endl);
continue;
}
continue;
}
DBG(cout << "Dbg: Line 716" << endl);
inputKafkaMsg = inputKafkaMsg + (char*)rkmessage->payload;
rd_kafka_message_destroy(rkmessage);
}
else
{
DBG(cout << "Dbg: Line 728" << endl);
DBG(cout << "inputKafkaMsg: " << inputKafkaMsg << endl);
inputKafkaMsg = "";
}
// Second consumer message poller
if (rkmessage2)
{
if ((rkmessage2 == NULL))
{
continue;
}
if ((rkmessage2->err))
{
if ((rkmessage2->err == RD_KAFKA_RESP_ERR__PARTITION_EOF))
{
DBG(cout << "Dbg: Line 709" << endl);
continue;
}
if ((rkmessage2->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
rkmessage2->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC))
{
DBG(cout << "Dbg: Line 718" << endl);
continue;
}
continue;
}
DBG(cout << "Dbg: Line 768" << endl);
inputKafkaMsg2 = inputKafkaMsg2 + (char*)rkmessage2->payload;
rd_kafka_message_destroy(rkmessage2);
}
else
{
DBG(cout << "Dbg: Line 779" << endl);
DBG(cout << "inputKafkaMsg2: " << inputKafkaMsg2 << endl);
inputKafkaMsg2 = "";
}
}
err = rd_kafka_consumer_close(rk1);
err2 = rd_kafka_consumer_close(rk2);
if (err || err2)
{
fprintf(stderr, "%% Failed to close consumer(s): %s\n", rd_kafka_err2str(err));
}
else
{
fprintf(stderr, "%% Consumer(s) closed\n");
}
}
void subscribeToKafkaTopics(rd_kafka_t * rk, rd_kafka_topic_partition_list_t * topics)
{
rd_kafka_resp_err_t err;
DBG(cout << "Inside subscribeToKafkaTopics()" << endl);
if ((err = rd_kafka_subscribe(rk, topics)))
{
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
}
int configKafkaConsumer()
{
char hostname[128] = "localhost:9092";
char errstr[512] = "";
string kafkaBrokerServer = "localhost:9092";
string kafkaBrokerServerList = "localhost:9092,52.111.222.217:9092";
vector<string> firstListtopics;
vector<string> secondListtopics;
rd_kafka_topic_partition_list_t* list_firstListtopics = NULL;
rd_kafka_topic_partition_list_t* list_secondListtopics = NULL;
// Consumer Configurations
rd_kafka_conf_t* config[2] = { NULL, NULL };
// Kafka handles configured (as consumers)
rd_kafka_t* rk[2] = { NULL, NULL };
unsigned int i = 0;
DBG(cout << "Inside config_kafka_consumer()" << endl);
firstListtopics.push_back("firsttopic1");
secondListtopics.push_back("secondtopic1");
// Add topic to the first topic list
list_firstListtopics = rd_kafka_topic_partition_list_new((int)firstListtopics.size());
for (unsigned int i = 0; i < firstListtopics.size(); i++)
{
rd_kafka_topic_partition_list_add(list_firstListtopics, firstListtopics[i].c_str(),
RD_KAFKA_PARTITION_UA);
}
// Add topic to the second topic list
list_secondListtopics = rd_kafka_topic_partition_list_new((int)secondListtopics.size());
for (unsigned int i = 0; i < secondListtopics.size(); i++)
{
rd_kafka_topic_partition_list_add(list_secondListtopics, secondListtopics[i].c_str(),
RD_KAFKA_PARTITION_UA);
}
for (i = 0; i < 2; i++)
{
config[i] = rd_kafka_conf_new();
// Configure the conf properties 'client.id' to identify the consumer here
if (rd_kafka_conf_set(config[i], "client.id", hostname,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%% %s\n", errstr);
DBG(cout << "client.id:" << endl);
exit(1);
}
// Configure the conf properties 'group.id' to identify the consumer here
if (rd_kafka_conf_set(config[i], "group.id", "foo",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%% %s\n", errstr);
DBG(cout << "group.id:" << endl);
exit(1);
}
// "localhost:9092,52.111.222.217:9092"
// Configure the broker list here "host1:9092 host2:etc ..."
/***THE BELOW IS NOT WORKING, WHEREAS JUST "localhost:9092 works FINE*****/
if (rd_kafka_conf_set(config[i], "bootstrap.servers", "localhost:9092,52.111.222.217:9092",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%% %s\n", errstr);
DBG(cout << "bootstrap.servers" << endl);
exit(1);
}
// Create consumer handles
if (!(rk[i] = rd_kafka_new(RD_KAFKA_CONSUMER, config[i],
errstr, sizeof(errstr))))
{
fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
exit(1);
}
}
subscribeToKafkaTopics(rk[0], list_firstListtopics);
subscribeToKafkaTopics(rk[1], list_secondListtopics);
pollForMsgs(rk[0], rk[1]);
// destroy lists
}
void main()
{
configKafkaConsumer();
while (1);
}
要重现相同的问题:只需按标题“测试应用程序的步骤”下的上述步骤运行本地kafka服务器localhost:9092“和”localhost:9092,52.111.222.217:9092". 在前一种情况下,生产者发送的任何消息都会显示在消费者端;而在后者的情况下,没有消息会出现在消费者方面。
暂无答案!
目前还没有任何答案,快来回答吧!