我目前正在尝试用C++协程来抽象掉io_uring。为此我有以下类:
class io_service final {
public:
explicit io_service(unsigned size, threadpool& pool) : pool_(pool) {
if (auto ret = io_uring_queue_init(size, &ring_, 0); ret < 0) {
throw std::runtime_error{"Liburing error!"};
}
}
~io_service() {
io_uring_queue_exit(&ring_);
}
void message_pump() {
io_uring_cqe* cqe = nullptr;
while (true) {
auto ret = io_uring_wait_cqe(&ring_, &cqe);
auto* data = static_cast<io_result*>(io_uring_cqe_get_data(cqe));
if (ret < 0) {
std::cerr << "Fatal error in io_uring_wait_cqe!\n";
throw std::runtime_error{"Fatal error in io_uring_wait_cqe!"};
}
if (cqe->res < 0) {
std::cerr << "Error while doing an asynchronous request: "
<< -cqe->res << " (" << strerror(-cqe->res) << ")\n";
throw std::runtime_error{"Error while doing an asynchronous request : "
+ std::string(strerror(-cqe->res))};
}
data->status_code = cqe->res;
pool_.push_task([handle = data->handle] { handle.resume(); });
io_uring_cqe_seen(&ring_, cqe);
}
}
[[nodiscard]] auto accept_async(int socket, sockaddr_in& in, socklen_t& socket_length) {
return uring_awaitable{
&ring_,
io_result::operation_type::accept,
io_uring_prep_accept,
socket,
reinterpret_cast<sockaddr*>(&in),
&socket_length,
0
};
}
private:
struct uring_awaiter {
io_uring* ring_;
io_uring_sqe* entry;
io_result request_data{};
explicit uring_awaiter(io_result::operation_type op_type, io_uring* ring, io_uring_sqe* sqe) : ring_(ring), entry(sqe), request_data{op_type} {}
[[nodiscard]] bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) noexcept {
request_data.handle = handle;
io_uring_sqe_set_data(entry, &request_data);
// SUBMITTING HERE LATER CAUSES ERRORS ==============================
io_uring_submit(ring_);
// ==================================================================
}
[[nodiscard]] int await_resume() const noexcept {
return request_data.status_code;
}
};
class uring_awaitable {
public:
template <typename F, typename... Args>
requires requires(F f) { f(std::declval<io_uring_sqe*>(), std::declval<Args>()...); }
uring_awaitable(io_uring* ring, io_result::operation_type op, F function, Args&&... args)
: ring_(ring), sqe_(io_uring_get_sqe(ring_)), op_(op) {
function(sqe_, std::forward<Args>(args)...);
}
auto operator co_await() const {
return uring_awaiter{op_, ring_, sqe_};
}
private:
io_uring* ring_;
io_uring_sqe* sqe_;
io_result::operation_type op_;
};
io_uring ring_{};
bool interrupted_ = false;
threadpool& pool_;
};
此类应按如下方式使用:
threadpool p{};
io_service s{128, p};
// In another thread, later
co_await s.accept_async(/* ... */);
当我将io_uring_submit
放在await_resume()
中时(如上面的代码片段所示),就会出现问题。125(操作已取消)"。但是,如果我将message_pump()
更改为如下内容(并删除await_resume()
中的提交):
void message_pump() {
using namespace std::chrono_literals;
io_uring_cqe* cqe = nullptr;
while (true) {
// SUBMITTING HERE ==================================================
std::this_thread::sleep_for(1s);
io_uring_submit(&ring_);
// ==================================================================
auto ret = io_uring_wait_cqe(&ring_, &cqe);
auto* data = static_cast<io_result*>(io_uring_cqe_get_data(cqe));
if (ret < 0) {
std::cerr << "Fatal error in io_uring_wait_cqe!\n";
throw std::runtime_error{"Fatal error in io_uring_wait_cqe!"};
}
if (cqe->res < 0) {
std::cerr << "Error while doing an asynchronous request: " << -cqe->res << " (" << strerror(-cqe->res) << ")\n";
throw std::runtime_error{"Error while doing an asynchronous request : " + std::string(strerror(-cqe->res))};
}
data->status_code = cqe->res;
pool_.push_task([handle = data->handle] { handle.resume(); });
io_uring_cqe_seen(&ring_, cqe);
}
}
现在一切都按预期运行。显然这不是正确的做事方式。
为什么第一种方法不起作用?
1条答案
按热度按时间jgovgodb1#
通过内核任务完成的操作必须使用调用
io_uring_submit()
的线程。这意味着在内核中完成cqe之前,该线程不能终止。如果从动态线程池提交sqes,则有丢失完成的风险。我不能100%确定accept使用了内核任务,或者这种情况下返回
-ECANCEL
,但是由于这个原因,我不得不切换到专用线程来提交uring_cmds。liburing特性请求“submit requests from any thread“的建议是提交一个线程,或者每个线程都有自己的环(滚动到最底部)。