c++ liburing:io_uring_submit()置于await_suspend中时会导致错误

wlzqhblo  于 2022-12-01  发布在  其他
关注(0)|答案(1)|浏览(86)

我目前正在尝试用C++协程来抽象掉io_uring。为此我有以下类:

class io_service final {
public:
   explicit io_service(unsigned size, threadpool& pool) : pool_(pool) {
      if (auto ret = io_uring_queue_init(size, &ring_, 0); ret < 0) {
         throw std::runtime_error{"Liburing error!"};
      }
   }

   ~io_service() {
      io_uring_queue_exit(&ring_);
   }

   void message_pump() {
      io_uring_cqe* cqe = nullptr;

      while (true) {
         auto ret = io_uring_wait_cqe(&ring_, &cqe);
         auto* data = static_cast<io_result*>(io_uring_cqe_get_data(cqe));

         if (ret < 0) {
            std::cerr << "Fatal error in io_uring_wait_cqe!\n";
            throw std::runtime_error{"Fatal error in io_uring_wait_cqe!"};
         }

         if (cqe->res < 0) {
            std::cerr << "Error while doing an asynchronous request: " 
                      << -cqe->res << " (" << strerror(-cqe->res) << ")\n";
            throw std::runtime_error{"Error while doing an asynchronous request : " 
                        + std::string(strerror(-cqe->res))};
         }

         data->status_code = cqe->res;

         pool_.push_task([handle = data->handle] { handle.resume(); });
         io_uring_cqe_seen(&ring_, cqe);
      }
   }

   [[nodiscard]] auto accept_async(int socket, sockaddr_in& in, socklen_t& socket_length) {
      return uring_awaitable{
         &ring_, 
         io_result::operation_type::accept, 
         io_uring_prep_accept,
         socket, 
         reinterpret_cast<sockaddr*>(&in), 
         &socket_length, 
         0
      };
   }

  
private:
   struct uring_awaiter {
      io_uring* ring_;
      io_uring_sqe* entry;
      io_result request_data{};

      explicit uring_awaiter(io_result::operation_type op_type, io_uring* ring, io_uring_sqe* sqe) : ring_(ring), entry(sqe), request_data{op_type} {}

      [[nodiscard]] bool await_ready() const noexcept { return false; }

      void await_suspend(std::coroutine_handle<> handle) noexcept {
         request_data.handle = handle;
         io_uring_sqe_set_data(entry, &request_data);


         // SUBMITTING HERE LATER CAUSES ERRORS ==============================
         io_uring_submit(ring_);
         // ==================================================================


      }

      [[nodiscard]] int await_resume() const noexcept {
         return request_data.status_code;
      }
   };

   class uring_awaitable {
   public:
      template <typename F, typename... Args>
         requires requires(F f) { f(std::declval<io_uring_sqe*>(), std::declval<Args>()...); }
      uring_awaitable(io_uring* ring, io_result::operation_type op, F function, Args&&... args)
         : ring_(ring), sqe_(io_uring_get_sqe(ring_)), op_(op) {
         function(sqe_, std::forward<Args>(args)...);
      }

      auto operator co_await() const {
         return uring_awaiter{op_, ring_, sqe_};
      }

   private:
      io_uring* ring_;
      io_uring_sqe* sqe_;
      io_result::operation_type op_;
   };

   io_uring ring_{};
   bool interrupted_ = false;
   threadpool& pool_;
};

此类应按如下方式使用:

threadpool p{};
io_service s{128, p};

// In another thread, later
co_await s.accept_async(/* ... */);

当我将io_uring_submit放在await_resume()中时(如上面的代码片段所示),就会出现问题。125(操作已取消)"。但是,如果我将message_pump()更改为如下内容(并删除await_resume()中的提交):

void message_pump() {
   using namespace std::chrono_literals;

   io_uring_cqe* cqe = nullptr;

   while (true) {
      // SUBMITTING HERE ==================================================
      std::this_thread::sleep_for(1s);
      io_uring_submit(&ring_);
      // ==================================================================

      auto ret = io_uring_wait_cqe(&ring_, &cqe);
      auto* data = static_cast<io_result*>(io_uring_cqe_get_data(cqe));

      if (ret < 0) {
         std::cerr << "Fatal error in io_uring_wait_cqe!\n";
         throw std::runtime_error{"Fatal error in io_uring_wait_cqe!"};
      }

      if (cqe->res < 0) {
         std::cerr << "Error while doing an asynchronous request: " << -cqe->res << " (" << strerror(-cqe->res) << ")\n";
         throw std::runtime_error{"Error while doing an asynchronous request : " + std::string(strerror(-cqe->res))};
      }

      data->status_code = cqe->res;

      pool_.push_task([handle = data->handle] { handle.resume(); });
      io_uring_cqe_seen(&ring_, cqe);
   }
}

现在一切都按预期运行。显然这不是正确的做事方式。
为什么第一种方法不起作用?

jgovgodb

jgovgodb1#

通过内核任务完成的操作必须使用调用io_uring_submit()的线程。这意味着在内核中完成cqe之前,该线程不能终止。如果从动态线程池提交sqes,则有丢失完成的风险。
我不能100%确定accept使用了内核任务,或者这种情况下返回-ECANCEL,但是由于这个原因,我不得不切换到专用线程来提交uring_cmds。
liburing特性请求“submit requests from any thread“的建议是提交一个线程,或者每个线程都有自己的环(滚动到最底部)。

相关问题