我有一个生产者在主线程上运行,一个消费者在它自己的线程(std::thread)上运行。我有一个简单的程序,它使用producer发送一条消息,然后在尝试发送另一条消息之前将主线程置于休眠状态。
每当我的主线程进入休眠状态时,程序就存在了。没什么例外。当我试图正确地停止并删除我的消费者/生产者时,也会发生同样的事情。很明显,我做错了什么,但我不能告诉什么,因为我没有得到任何类型的错误,我的程序。我看到的最后一条日志消息是在主线程进入睡眠状态之前打印的消息。
我把try-catch放在了main和consumer线程中。我还调用了std::set\u terminate并在其中添加了日志记录。当我的程序退出try catch或terminate catch时,将捕获任何内容。
有什么建议吗?
更新#1[源]
正如希德指出的,我遗漏了明显的来源。
主.cc
int main(int argc, char**argv) {
std::cout << "% Main started." << std::endl;
std::set_terminate([](){
std::cerr << "% Terminate occurred in main." << std::endl;
abort();
});
try {
using com::anya::core::networking::KafkaMessenger;
using com::anya::core::common::MessengerCode;
KafkaMessenger messenger;
auto promise = std::promise<bool>();
auto future = promise.get_future();
messenger.Connect([&promise](MessengerCode code, std::string& message) {
promise.set_value(true);
});
future.get();
std::cout << "% Main connection successful." << std::endl;
// Produce 5 messages 5 seconds apart.
int number_of_messages_sent = 0;
while (number_of_messages_sent < 5) {
std::stringstream message;
message << "message-" << number_of_messages_sent;
auto message_send_promise = std::promise<bool>();
auto message_send_future = message_send_promise.get_future();
messenger.SendMessage(message.str(), [&message_send_promise](MessengerCode code) {
std::cout << "% Main message sent" << std::endl;
message_send_promise.set_value(true);
});
message_send_future.get();
number_of_messages_sent++;
std::cout << "% Main going to sleep for 5 seconds." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
}
// Disconnect from Kafka and cleanup.
auto disconnect_promise = std::promise<bool>();
auto disconnect_future = disconnect_promise.get_future();
messenger.Disconnect([&disconnect_promise](MessengerCode code, std::string& message) {
disconnect_promise.set_value(true);
});
disconnect_future.get();
std::cout << "% Main disconnect complete." << std::endl;
} catch (std::exception& exception) {
std::cerr << "% Exception caught in main with error: " << exception.what() << std::endl;
exit(1);
}
std::cout << "% Main exited." << std::endl;
exit(0);
}
kafkamessenger.cc[消费者部分]
void KafkaMessenger::Connect(std::function<void(MessengerCode , std::string&)> impl) {
assert(!running_.load());
running_.store(true);
// For the sake of brevity I've removed a whole bunch of Kafka configuration setup from the sample code.
RdKafka::ErrorCode consumer_response = consumer_->start(topic_for_consumer, 0, RdKafka::Topic::OFFSET_BEGINNING);
if (consumer_response != RdKafka::ERR_NO_ERROR) {
running_.store(false);
delete consumer_;
delete producer_;
error = RdKafka::err2str(consumer_response);
impl(MessengerCode::CONNECT_FAILED, error);
}
auto consumer_thread_started_promise = std::promise<bool>();
auto consumer_thread_started_future = consumer_thread_started_promise.get_future();
consumer_thread_ = std::thread([this, &topic_for_consumer, &consumer_thread_started_promise]() {
try {
std::cout << "% Consumer thread started." << std ::endl;
consumer_thread_started_promise.set_value(true);
while (running_.load()) {
RdKafka::Message* message = consumer_->consume(topic_for_consumer, 0, 5000);
switch (message->err()) {
case RdKafka::ERR_NO_ERROR: {
std::string message_string((char*) message->payload());
std::cout << "% Consumer received message: " << message_string << std::endl;
delete message;
break;
}
default:
std::cerr << "% Consumer consumption failed: " << message->errstr() << " error code=" << message->err() << std::endl;
break;
}
}
std::cout << "% Consumer shutting down." << std::endl;
if (consumer_->stop(topic_for_consumer, 0) != RdKafka::ERR_NO_ERROR) {
std::cerr << "% Consumer error while trying to stop." << std::endl;
}
} catch (std::exception& exception) {
std::cerr << "% Caught exception in consumer thread: " << exception.what() << std::endl;
}
});
consumer_thread_started_future.get();
std::string message("Consumer connected");
impl(MessengerCode::CONNECT_SUCCESS, message);
}
kafkamessenger.cc[制作人部分]
void KafkaMessenger::SendMessage(std::string message, std::function<void(MessengerCode)> impl) {
assert(running_.load());
std::cout << "% Producer sending message." << std::endl;
RdKafka::ErrorCode producer_response = producer_->produce(
producer_topic_,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
static_cast<void*>(&message), message.length(), nullptr, nullptr);
switch (producer_response) {
case RdKafka::ERR_NO_ERROR: {
std::cout << "% Producer Successfully sent (" << message.length() << " bytes)" << std::endl;
impl(MessengerCode::MESSAGE_SEND_SUCCESS);
break;
}
case RdKafka::ERR__QUEUE_FULL: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
case RdKafka::ERR__UNKNOWN_PARTITION: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
case RdKafka::ERR__UNKNOWN_TOPIC: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
default: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
}
}
输出当我运行main方法时,这是我在控制台中看到的输出。
% Main started.
% Consumer thread started.
% Main connection successful.
% Producer sending message.
% Producer Successfully sent (9 bytes)
% Main message sent
% Main going to sleep for 5 seconds.
% Consumer received message: message-
经过仔细检查,我不认为睡眠是原因,因为当我取消睡眠,这仍然发生。正如您在最后一行日志中看到的,消费者打印它接收到的消息,最后一个字符被截断。有效负载应读取消息-0。所以某处有东西正在死去。
更新#2[堆栈跟踪]
我偶然发现了这篇关于捕捉信号和打印堆栈的老文章,但非常有用。我实现了这个解决方案,现在我可以看到更多关于崩溃的信息。
Error: signal 11:
0 main 0x00000001012e4eec _ZN3com4anya4core10networking7handlerEi + 28
1 libsystem_platform.dylib 0x00007fff60511f5a _sigtramp + 26
2 ??? 0x0000000000000000 0x0 + 0
3 main 0x00000001012f2866 rd_kafka_poll_cb + 838
4 main 0x0000000101315fee rd_kafka_q_serve + 590
5 main 0x00000001012f5d46 rd_kafka_flush + 182
6 main 0x00000001012e7f1a _ZN3com4anya4core10networking14KafkaMessenger10DisconnectENSt3__18functionIFvNS1_6common13MessengerCodeENS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEEE + 218
7 main 0x00000001012dbc45 main + 3221
8 libdyld.dylib 0x00007fff60290115 start + 1
9 ??? 0x0000000000000001 0x0 + 1
作为关闭方法的一部分,我调用producer->flush(1000),这将导致生成堆栈跟踪。如果我把它取下来,关机就好了。很明显,我配置错误的东西,然后是造成这个分段故障时,我试图冲水。
更新#3[解决方案]
因此,我处理kafka事件日志和传递报告的类的作用域是一个方法。这是一个问题,因为librdkafka库通过引用获取这些对象,所以当我的主运行程序方法退出并开始清理时,这些对象就消失了。我将记录器的范围限定到类级别,这修复了崩溃。
1条答案
按热度按时间p1iqtdky1#
kafka消息有效负载只是二进制数据,除非发送带有尾随nul字节的字符串,否则它不会包含这样的nul字节,这会导致std::string构造函数读入相邻内存寻找nul,可能会访问未Map的内存,这会导致应用程序崩溃,或者至少会损坏终端。
将消息长度与有效负载结合使用,以构造限制为实际字节数的std::字符串,打印仍然不安全,但这是一个开始: