Linux下高阶IO

x33g5p2x  于2022-07-20 转载在 Linux  
字(19.7k)|赞(0)|评价(0)|浏览(500)

一.五种IO模型

1.1阻塞IO

1.2非阻塞IO

1.3信号驱动IO

1.4多路转接IO

1.5异步IO

二.高级IO的重要概念

2.1同步与异步通信

2.2阻塞与非阻塞

三.多路转接模型

3.1select模型

3.2 poll模型

3.3  epoll

一.五种IO模型

内存和外设进行数据交换的过程叫做IO,在以前学过的网络里面一台主机将数据跨网络传送给另外一台主机,数据要从另外一台的网卡当中读取到内存当中。我们可以认为所有的IO过程可以分为两个步骤:第一:等待数据就绪。第二:拷贝数据。在实际情况下等的时间往往是要高于拷贝数据的时间。如果我们想要提高IO的效率,就要尽可能的减少等待的时间。我们在学习网络的时候学习到的recv和recvfrom我们在调用这些函数时如果内核的接收缓存区没有时间此时他会卡在哪里看起来好像阻塞在哪里了,本质其实是进程被挂起了,等待数据就绪。

1.1阻塞IO

阻塞IO: 在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式.举个例子:
比如说你在楼下打电话叫你的室友出去吃饭,你室友说等下我马上就好。然后你知道了但是只要他没有下楼你不会把电话挂断一直在等他下楼直到他下楼了,你才把电话挂断。

阻塞IO一般都比较的简单很容易编写代码,但是如果数据不就绪则会阻塞住。也就是进程会被挂起

1.2非阻塞IO

非阻塞IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码或者EAGE
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用.

非阻塞IO和这个阻塞IO的区别就是:IO调用如果不就绪是否立即返回没有立即返回就是阻塞IO,立即返回了就是这个非阻塞IO
下面我们来看看如何非阻塞函数的接口

函数原型fcntl:

int fcntl(int fd,int cmd,.........)

第一个参数fd:你要对哪个文件描述符进行操作
 第二个参数cmd:对应fctl的功能

剩下的是可变参数列表 

返回值调用成功返回0失败返回-1。cmd的主要选项:

本质上这些大写的宏就都是一个32位的整数并在32位中只有一个位为1
通过fcntl实现一个非阻塞文件描述符,对应代码:

#include <iostream>
#include <fcntl.h>
#include <unistd.h>
using namespace std;
#define NUM 1024
bool SetNoBlock(int fd)
{
  int f1 = fcntl(fd, F_GETFL);
  //获取文件的的标记
  if (f1 < 0)
  {
    cerr << "fcnt errror" << endl;
    return false;
  }
  fcntl(fd, F_SETFL, f1 | O_NONBLOCK); //设置非阻塞
  return true;
}

int main()
{
  SetNoBlock(0);
  //如果非阻塞读取数据时如果没有就绪read是以出错的形式返回的
  while (true)
  {
    char buffer[NUM] = {0};

    ssize_t size = read(0, buffer, sizeof(buffer) - 1);
    if (size < 0)
    {
      //不一定是出错了可能是底层没有数据
      cerr << "read error" << endl;
      if (errno == ERANGE || errno == EWOULDBLOCK)
      {
        cout << "底层数据没有就绪在尝试一次吧" << endl;
        continue;
      }
      else if (errno == EINTR)
      {
        cout << "被信号打断" << std::endl;
      }
      else
      {
        cout << "出错了" << endl;
      }
    }
    //读取成功
    buffer[size] = '\0';
    std::cout << "echo ###############################:" << buffer << endl;
  }
  return 0;
}

注意:如果recv的返回值为负数并不意味着读取失败我们还需要对这个错误码就行检测如果是这个EAGE或者EWOULDBLOCK说明底层数据没有就绪,如果是这个EINTR说明被这个信号打断

1.3信号驱动IO

信号驱动IO: 内核将数据准备好的时候内核使用SIGIO信号通知进程来进行读取数据

首先安装一个信号处理函数,此时进程继续运行并不阻塞。当数据准备好时,进程会收到一个SIGIO信号,可以在信号处理函数中调用I/O操作函数读数据,最后拷贝完成后再唤醒用户进程让它去处理数据。信号驱动式看起来是很美好的IO模式,但它有两个缺点:

  1. 在大量 IO 操作时可能会因为信号队列溢出导致没法通知
  2. 信号驱动 I/O 尽管对于处理 UDP 套接字来说有用,信号通知意味着到达一个数据报,或者返回一个异步错误,但是对于TCP而言,导致信号通知的情况有非常多种,连接断开,连接可读,连接可写等等都会产生 Signal,每一个来进行判别会消耗很大资源 这篇文章有详细的解释

1.4多路转接IO

多路转接IO就好像这个钓鱼的时候了,我们可以拿100个鱼竿这样我们调用的效率肯定要比你拿这个一个鱼竿的效率要高。实际上就是让内核帮我们关心多个文件描述符只要有一个就绪了,就会通知调用者

1.5异步IO

由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据。异步 IO 看上去和 Signal Driven IO 很相似,但区别就在于信号驱动式IO在数据准备好后,仍然是用户进程进行系统调用拷贝数据,而异步IO是在内核直接将数据拷贝好,才会通知用户进程,直接处理数据。

 二.高级IO的重要概念

2.1同步与异步通信

  • 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得 到返回值了; 换句话说,就是由调用者主动等待这个调用的结果。
  • 异步则是相反,发出调用后立即返回调用内的工作由别人完成自己是不会进行参与的等待被调用者者通知,直接使用即可

注意:在这里同步通信的概念和这个进程间的同步是完成不相干的概念:进程/线程同步也是进程/线程之间直接的制约关系 是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、 传递信息所产生的制约关系. 尤其是在访问临界资源的时候。

2.2阻塞与非阻塞

阻塞和非阻塞讨论的是在等待调用结果时的状态:

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回.
  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程
  • 所谓的阻塞在用户层看来就是卡住了,而在OS层面看来就是这个这个进程不挂起了
  • 非阻塞本质是用户发起请求,OS帮我来检测资源是否就绪。

三.多路转接模型

多路转接可以帮我们监测大量的文件描述符对应的事件,这些事件包括了可读事件,可写事件,异常事件。当添加就绪了就会通知调用者事件就绪了,你快来读取吧

3.1select模型

select,poll,epoll这三个函数是Linux系统中I/O复用的系统调用函数。I/O复用是的这几个函数可以同时监听多个文件描述符。select是三者之中使用起来最难的也是最底层的。它的事件轮询机制是基于比特位的。每次遍历都需要遍历整个事件列表。下面我们来看看select函数的原型:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

个人觉得要理解select我们首先就要理解select系统调用当中的fd_set这个数据结构。每个select都要处理一个fd_set的结构,fd_set在内核当中其实就是一个位图和学习信号时候我们学的sigset_t 一样是一个位图结构。每个比特位代表用户让内核关心的fd,如果为1那么就表示这个fd用户想要我们关心,否则就没有。

注意:我们不能用我们在C语言中学过的位运算对他直接进行操作,必须使用OS给我们提供的接口。这是因为在不同的OS中这个位图的实现不一定是整数。

下面让我们来看看这一些OS提供给我们操作位图的接口:

void FD_CLR(int fd,fd_set*set);

int FD_ISSET(int fd,fd_set*set);

void FD_SET(int fd,fd_set*set);

void FD_ZERO(fd_set*set);

函数说明:

  • 第一个函数FD_CLR:用来清除set位图中的对应的fd位置即将set位图当中的fd比特位置为0
  • 第二个函数FD_ISSET:用来判断fd是否在set位图当中即检测对应set位图当中fd位图比特位的值是否为1
  • 第三个函数FD_SET:用来设置fd即将set位图当中对应第fd个比特位的位置设置为1
  • 第四个函数FD_ZERO:清空位图将位图当中所有的比特位设置为0

注意:在内核当中这个位图比特位的数量是有限的,也就意味着我们想要select帮我们关心的文件描述符是有限的。

下面我们再来详细看看select函数每个参数的含义:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • nfds表示文件描述符的最大值+1在这里需要加1的原因是内核进行检测是通过变量的形式进行检测就类似于for(int i=0;i<nfds;i++);这样进行检测。
  • readfds:表示监测读事件的位图,如果没有那么就设置为NULL
  • writefds:表示监测写事件的位图,如果没有可以设置为NULL
  • exceptfds:表示监测异常事件的位图,没有可以设置为NULL
  • timeout:表示这个阻塞时间如果设置为0代表非阻塞,如果设置为NULL代表永久阻塞直到有事件到来,如果设置这个结构体里面的值代表在设置的返回内阻塞超过了这个时间直接返回也就是超时。

注意:readfds、writefds和exceptfds为这个输入输出型参数,用户定义的这几个位图OS会对这个几个位图进行修改,OS如何告诉我们那些文件描述符就绪了。其实OS会将就绪的位图设置1,没有就绪的位图设置为0.也就是说用户需要自己保存一份自己想要内核关心的文件描述符。

其中timeval的结构体定义如下:

struct timeval {
               long    tv_sec;         /* seconds */
               long    tv_usec;        /* microseconds */
           };

其中tv_sec代表的是这个秒,tv_usec代表的是这个微秒。

select函数的返回值有三个:

  • 大于0:代表就绪事件的总个数即readfds和writefds就绪事件的总数由于博主水平一般所以exceptfds没有使用过在这里不考虑
  • 等于0:代表超时返回0
  • 小于0:代表出错

下面我们来看看使用select来编写简单的服务器,在这里为了简单我们只关心读事件:

首先我们将套接字的一系列操作进行封装:

1.sock.hpp

#pragma once
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include <cstring>
namespace ksy
{
    class Sock
    {
    public:
        static int Socket()
        {
            int sock = socket(AF_INET, SOCK_STREAM, 0);
            int opt = 1;
            setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
            //端口复用
            if (sock < 0)
            {
                std::cerr << "socket error" << std::endl;
                exit(1);
            }
            return sock;
        }
        static bool Bind(int sock, unsigned short port)
        {
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_addr.s_addr = INADDR_ANY;
            local.sin_family = AF_INET;
            local.sin_port = htons(port);
            if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                std::cerr << "bind error" << std::endl;
                exit(2);
            }

            return true;
        }

        static bool Listen(int sock, int backlog)
        {
            if (listen(sock, backlog) < 0)
            {
                std::cerr << "listen error" << std::endl;
            }
            return true;
        }
    };
}

2.SelectServer.hpp

#pragma once
#include "sock.hpp"
#include <sys/select.h>
#define BACK_LOG 5
#define NUM 1024
#define DFL_FD -1
namespace ksy_select
{
    class SelectServer
    {
    private:
        int listen_sock;
        unsigned short port;

    public:
        SelectServer(unsigned short _port) : port(_port)
        {
        }
        ~SelectServer()
        {
        }

        void InitSelectServer()
        {
            listen_sock = ksy::Sock::Socket();
            ksy::Sock::Bind(listen_sock, port);
            ksy::Sock::Listen(listen_sock, BACK_LOG);
        }

        void Run()
        {
            fd_set rfds;
            int fd_arry[NUM] = {0};          //保存所有的文件描述符
            ClearArry(fd_arry, NUM, DFL_FD); //用来初始化数组中所有的fd
            fd_arry[0] = listen_sock;        //把监听套接字sock写入数组的第一个元素

            for (;;)
            {
                //时间也是输入输出,如果你是间隔性的timeout返回,那么就需要对时间重写设定
                struct timeval timeout = {5, 0};
                //对所有的合法fd进行在select中重新设定
                FD_ZERO(&rfds); //清空所有的读文件描述符集
                //第一次循环的时候,我们的fd_arry数组至少有了一个fd,listen_sock
                int max_fd = DFL_FD;
                for (int i = 0; i < NUM; i++)
                {
                    if (fd_arry[i] == DFL_FD)
                    {
                        continue;
                    }
                    //说明是需要添加的合法fd
                    FD_SET(fd_arry[i], &rfds);
                    if (fd_arry[i] > max_fd)
                    {
                        max_fd = fd_arry[i]; //更新最大文件描述符
                    }
                }
                // select 阻塞等待
                // timeval={0}非阻塞轮询
                //阻塞轮询+轮询timeval={5,0},5秒之内,select返回无论是否有事件就绪

                switch (select(max_fd + 1, &rfds, nullptr, nullptr, &timeout))
                {
                case 0://超时
                    std::cerr << "time out" << std::endl;
                    break;
                case -1://出错
                    std::cerr << "select error" << std::endl;
                    break;
                default:
                    //正常的事件处理
                    std::cout << "有事件发生....... timeout:" << timeout.tv_sec << std::endl;
                    HandEvent(rfds, fd_arry, NUM);
                } // end switch
            }
        }

    private:
        void ClearArry(int fd_arry[], int num, int defalult_fd)//将文件描述符设置为defalutlt_fd
        {
            for (int i = 0; i < num; i++)
            {
                fd_arry[i] = defalult_fd;
            }
        }
        void HandEvent(const fd_set &rfds, int fd_array[], int num)
        {
            //如何判断那些文件描述符就绪了只需要特定的fd是否在rfds当中
            // FD_INSET我都有那些文件描述符了fd_arry[]
            for (auto i = 0; i < num; i++)
            {
                if (fd_array[i] == DFL_FD)
                {
                    continue;
                }
                //链接和读事件就绪
                if (fd_array[i] == listen_sock && FD_ISSET(fd_array[i], &rfds))
                {
                    //是一个合法的fd并且已经就绪了并且是链接事件到来
                    // accept
                    struct sockaddr_in perr;
                    socklen_t len = sizeof(perr);
                    int sock = accept(listen_sock, (struct sockaddr *)&perr, &len);
                    //不会阻塞已经就绪了
                    int perr_port = ntohs(perr.sin_port);
                    std::string perr_ip = inet_ntoa(perr.sin_addr);
                    std::cout << "get a new link" << std::endl;
                    std::cout << perr_ip << ": " << perr_port << std::endl;

                    if (sock < 0)
                    {
                        std::cerr << "accept error" << std::endl;
                        continue;
                    }
                    //绝对不可以读取,链接建立好不代表对方把数据给你了
                    //将该文件描述符添加的fd_arry数组当中,为什么
                    if (!AddFdToArray(fd_array, num, sock))
                    {
                        std::cout << "select server is full ,close fd:" << sock << std::endl;
                        close(sock);
                    }
                }
                else
                {
                    //处理正常的fd
                    if (FD_ISSET(fd_array[i], &rfds))
                    {
                        //在这里进行读写不会被阻塞可以直接调用recv()
                        char buffer[1024];
                        //你能确定你读完了请求了吗
                        //如果我一条链接给你发了多个请求但是都只有10个字节粘包问题
                        //如果没有读到一个完整的报文数据有可能丢失这里我们保证自己能够拿到完整的数据了
                        // TCP面向字节流需要我们定制协议,还要给每一个sock定义对应的缓存区
                        ssize_t s = recv(fd_array[i], buffer, sizeof(buffer) - 1, 0);
                        if (s > 0)
                        {
                            buffer[s] = 0;
                            std::cout << "echo#: " << buffer << std::endl;
                        }
                        else if (s == 0) //对端把链接关闭了
                        {
                            std::cout << "client close" << std::endl;
                            close(fd_array[i]);
                            fd_array[i] = -1;
                            //清除数组中的文件描述符
                        }

                        else
                        {
                            close(fd_array[i]);
                            std::cerr << "recv error" << std::endl;
                        }
                    }

                    else
                    {
                    }
                }
            }
        }
        //将sock这个套接字发到数组当中
        bool AddFdToArray(int fd_arry[], int num, int sock)
        {
            for (int i = 0; i, num; i++)
            {
                if (fd_arry[i] == DFL_FD) //说明该位置没有被使用
                {
                    fd_arry[i] = sock;
                    return true;
                }
            }
            return false;
        }
    };
}

3.server.cc

#include "Select_server.hpp"
#include <iostream>
#include <string>
#include <cstdlib>
static void Usage(std::string proc)
{
  std::cerr << "Usage:"
            << "\n\t" << proc << "port" << std::endl;
}
// select的缺点
// select 的缺点能够等待的文件描述符是有限的
// select需要和OS交换数据,涉及到较多数据来回拷贝,当select面临的链接很多时,就绪的也很多时,会因为拷贝数据而导致效率降低
// select每次调用都必须重写添加fd一定会影响程序的运行效率,并且非常的麻烦容易出错
// select(nfds):maxfd+1 OS在检测fd就绪时需要遍历。所以当有大量链接的时候内核会同步select底层遍历成本会变得很高

//有点 select 可以同时等待多个fd 而且只复杂等待 有具体的accept recv send  不会被阻塞。任何一个fd就绪的概率增加了,我们的
//服务器在单位时间内等的比重就在降低 可以提高效率
//使用大量的链接并且链接不时很活跃聊天工具

// poll解决了select的文件描述符是有上限的
// poll将用户告诉内核让OS帮我关心那些文件描述符的那些事件,将内核告诉用户那些文件描述符的那些是就绪的分离
//不用每次调用poll的时候重新添加fd,fd关心的事件
int main(int argc, char *argv[], char *env[])
{
  if (argc != 2)
  {
    Usage(argv[0]);
  }
  unsigned short port = atoi(argv[1]);
  ksy_select::SelectServer *select_svr = new ksy_select::SelectServer(port);
  select_svr->InitSelectServer();
  select_svr->Run();
  return 0;
}

select的优点:

select 可以同时等待多个fd 而且只复杂等待 有具体的accept recv send  不会被阻塞。任何一个fd就绪的概率增加了,我们的服务器在单位时间内等的比重就在降低 可以提高效率使用大量的链接并且链接不时很活跃聊天工具。

select的缺点:

  • 能够同时等待的文件描述符是有限的
  • 每次调用select都需要将fd集合从用户太拷贝到内核态当fd很多时开销比较大
  • 每次调用select都需要在内核态遍历传进来的所有fd当fd数量很多时,效率会降低
  • select每次调用都必须重写添加fd一定会影响程序的运行效率,并且非常的麻烦容易出错

3.2 poll模型

poll我们可以认为是select的升级版主要解决了select能够同时等待的文件描述符的数量是有的 

poll将用户告诉内核让OS帮我关心那些文件描述符的那些事件,将内核告诉用户那些文件描述符的那些是就绪的分离,不用每次调用poll的时候重新添加fd,fd关心的事件。

下面我们来看看poll底层的数据结构pollfd:

struct pollfd {
               int   fd;         /* file descriptor */
               short events;     /* requested events */
               short revents;    /* returned events */
           };

我们在使用这个结构时不再是对特定的比特位进行操作而是对本身的事件进行操作。同时的我们在使用之前将事件全部初始化为0可以使用memset或者这个bzero。监听的时候在revents上监听即可。注册事件使用|操作,查询事件使用&操作。比如想要注册POLLIN数据到来的事件,需要pfd.events |= POLLIN,注册多个事件进行多次|操作即可。取消事件进行~操作,比如pfd.events ~= POLLIN。查询事件:pfd.revents & POLLIN。其中events和revents的取值主要有如下几种:

下面让我们来看看poll函数:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数说明:

  • fds:一个pollfd队列的队头指针,我们需要将需要监视的文件描述符和对应的事件添加到这个队列当中
  • nfds队列的长度
  • timeout:事件操作timeout大于0代表在指定时间内阻塞超过这个时间就超时了返回,如果设置为-1代表永久阻塞,0代表非阻塞模式

下面给出一个简单的poll服务器设计:

1.sock.hpp

#pragma once
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include <cstring>
namespace ksy
{
    class Sock
    {
    public:
        static int Socket()
        {
            int sock = socket(AF_INET, SOCK_STREAM, 0);
            int opt = 1;
            setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
            //端口复用
            if (sock < 0)
            {
                std::cerr << "socket error" << std::endl;
                exit(1);
            }
            return sock;
        }
        static bool Bind(int sock, unsigned short port)
        {
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_addr.s_addr = INADDR_ANY;
            local.sin_family = AF_INET;
            local.sin_port = htons(port);
            if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                std::cerr << "bind error" << std::endl;
                exit(2);
            }

            return true;
        }

        static bool Listen(int sock, int backlog)
        {
            if (listen(sock, backlog) < 0)
            {
                std::cerr << "listen error" << std::endl;
            }
            return true;
        }
    };
}

2.pollserver.hpp

#include "sock.hpp"
#include <sys/poll.h>

namespace ns_poll
{
    class PollServer
    {
    private:
        int listen_sock;
        int port;

    public:
        PollServer(int _port) : port(_port)
        {
        }
        void InitServer()
        {
            listen_sock = ksy::Sock::Socket();
            ksy::Sock::Bind(listen_sock, port);
            ksy::Sock::Listen(listen_sock, 5);
        }
        bool AddEvent(struct pollfd rfds[], int num, int sock) //添加事件
        {
            for (int i = 0; i < num; i++)
            {
                if (rfds[i].fd == -1)
                {
                    rfds[i].fd = sock;
                    rfds[i].events |= POLLIN;
                    rfds[i].revents = 0;
                    return true;
                }
            }
            return false;
        }
        void Run()
        {
            struct pollfd rfds[64];
            //由于pollfd数组的大小是用户指定的所以这个能够添加的文件描述符是没有上限的
            for (int i = 0; i < 64; i++)
            {
                rfds[i].fd = -1;
                rfds[i].events = 0;
                rfds[i].revents = 0;
            }

            rfds[0].fd = listen_sock;
            rfds[0].events |= POLLIN; //让操作系统帮我们关心读事件
            rfds[0].revents = 0;
            for (;;)
            {
                switch (poll(rfds, 64, -1)) //永久等待
                {
                case 0:
                    std::cout << "time out" << std::endl;
                    break;
                case -1:
                    std::cerr << "pool error" << std::endl;
                    break;

                default:
                    for (int i = 0; i < 64; i++)
                    {
                        if (rfds[i].fd == -1)
                        {
                            continue;
                        }
                        else
                        {
                            if (rfds[i].revents & POLLIN)
                            {
                                if (rfds[i].fd == listen_sock)
                                {
                                    // accept;
                                    std::cout << "get a new link" << std::endl;
                                    struct sockaddr_in peer;
                                    socklen_t len = sizeof(peer);
                                    int sock = accept(listen_sock, (struct sockaddr *)&peer, &len);
                                    if (!AddEvent(rfds, 64, sock))
                                    {
                                        std::cerr << "满了 无法进行添加" << std::endl;
                                        close(rfds[i].fd);
                                    }
                                }
                                else
                                {   //进行读取
                                    //注意这里其实是不对的,需要定制协议在这里简单起见就这样简单的设置
                                    char buffer[1024] = {0};
                                    ssize_t s = recv(rfds[i].fd, buffer, sizeof(buffer) - 1, 0);
                                    if (s > 0)
                                    {
                                        buffer[s] = 0;
                                        std::cout << buffer << std::endl;
                                        send(rfds[i].fd, buffer, sizeof(buffer), 0);
                                    }
                                    else if (s == 0)
                                    {
                                        //对端关闭
                                        close(rfds[i].fd);
                                        rfds[i].fd = -1;
                                        rfds[i].events = 0;
                                    }
                                    else
                                    {
                                        //出错了
                                        //对端关闭
                                        close(rfds[i].fd);
                                        rfds[i].fd = -1;
                                        rfds[i].events = 0;
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    };
}

3.server.cc

#include "server.hpp"
#include <iostream>
#include <string>
#include <cstdlib>
static void Usage(std::string proc)
{
  std::cerr << "Usage:"
            << "\n\t" << proc << "port" << std::endl;
}

int main(int argc, char *argv[], char *env[])
{
  if (argc != 2)
  {
    Usage(argv[0]);
  }
  unsigned short port = atoi(argv[1]);
  ns_poll::PollServer*select_svr = new ns_poll::PollServer(port);
  select_svr->InitServer();
  select_svr->Run();
  return 0;
}

poll的缺点:

  • 但是poll在OS层面任然需要遍历当添加的文件描述符数量比较多的时候效率依然会降低
  • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符. 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中

3.3  epoll

epoll是一种I/0事件通知机制在Linux当中是多路复用的一个实现这点和select和poll一样。epoll有两种工作方式, LT-水平触发 和ET-边缘触发(默认工作方式),主要区别是:LT,内核通知你fd是否就绪,如果没有处理,则会持续通知。而ET,内核只通知一次。而select和这个poll都是这个LT即水平模式。

下面让我们来看看epoll的三个函数:

1.epoll_creat

int epoll_create(int size);

函数说明:

  • size参数告诉内核这个epoll对象处理的事件大致数量,而不是能够处理的最大数量。
  • 在现在的linux版本中,这个size函数已经被废弃(但是size不要传0,会报invalid argument错误)。
  • 内核会产生一个epoll 实例数据结构并返回一个文件描述符,这个特殊的描述符就是epoll实例的句柄,之后针对该epoll的操作需要通过该句柄来标识该epoll对象。

当我们使用epoll_create时内核会创建一颗红黑树和一个就绪队列,用于后序保存用户让内核的文件描述符。

2.epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

被本质上epoll_ctl是对epoll_create创建的红黑树进行增删改,如果我添加一个事件到红黑树当中底层会注册一个回调函数当对应的事件就绪时,会调用这个回调函数将该事件的相关信息封装成一个节点链入就绪队列当中。

函数说明:

  • 将被监听的描述符添加到红黑树或从红黑树中删除或者对监听事件进行修改
  • 返回:0表示成功,-1表示错误,根据errno错误码判断错误类型

epoll_event取值:
EPOLLIN:描述符处于可读状态
EPOLLOUT:描述符处于可写状态
EPOLLET:将epoll event通知模式设置成edge triggered
EPOLLONESHOT:第一次进行通知,之后不再监测
EPOLLHUP:本端描述符产生一个挂断事件,默认监测事件
EPOLLRDHUP:对端描述符产生一个挂断事件
EPOLLPRI:由带外数据触发
EPOLLERR:描述符产生错误时触发,默认检测事件

op参数说明操作类型:
EPOLL_CTL_ADD:向interest list添加一个需要监视的描述符
EPOLL_CTL_DEL:从interest list中删除一个描述符
EPOLL_CTL_MOD:修改interest list中一个描述符

3.epoll_wait

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

函数说明:

  • 阻塞等待注册的事件发生,返回事件的数目,并将触发的事件写入events数组中
  • events: 用来记录被触发的events,其大小应该和maxevents一致
  • maxevents: 期望返回就绪的events的个数
  • 参数timeout描述在函数调用中阻塞时间上限,单位是ms:
  •  timeout = -1表示调用将一直阻塞,直到有文件描述符进入ready状态或者捕获到信号才返回
  •     timeout = 0用于非阻塞检测是否有描述符处于ready状态,不管结果怎么样,调用都立即返回;
  •     timeout > 0表示调用将最多持续timeout时间,如果期间有检测对象变为ready状态或者捕获到信号则返回,否则直到超时。

4.epoll的工作原理

1.通过调用epoll_create,在epoll文件系统建立了个file节点,并开辟epoll自己的内核高速cache区,建立红黑树,分配好想要的size的内存对象,建立一个双向链表,用于存储准备就绪的事件。
2.通过调用epoll_ctl,把要监听的文件描述符放到对应的红黑树上,给内核中断处理程序注册一个回调函数,通知内核,如果这个句柄的数据到了,就把它放到就绪队列当中
3.通过调用 epoll_wait,观察等待队列里面有没有数据这个操作时间复杂度为O(1)不用像select和poll需要依次遍历。

5.实现一个简单版本的epoll服务器

1.sock.hpp

#pragma once
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include <cstring>
namespace ksy
{
    class Sock
    {
    public:
        static int Socket()
        {
            int sock = socket(AF_INET, SOCK_STREAM, 0);
            int opt = 1;
            setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
            //端口复用
            if (sock < 0)
            {
                std::cerr << "socket error" << std::endl;
                exit(1);
            }
            return sock;
        }
        static bool Bind(int sock, unsigned short port)
        {
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_addr.s_addr = INADDR_ANY;
            local.sin_family = AF_INET;
            local.sin_port = htons(port);
            if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                std::cerr << "bind error" << std::endl;
                exit(2);
            }

            return true;
        }

        static bool Listen(int sock, int backlog)
        {
            if (listen(sock, backlog) < 0)
            {
                std::cerr << "listen error" << std::endl;
            }
            return true;
        }
    };
}

2.epollServer.hpp

#pragma once
#include "sock.hpp"
#include <sys/epoll.h>
namespace ns_epoll
{
    const int back_log = 5;
    const size_t size = 256;
#define MAX_NUM 64
    class EpollServer
    {
    private:
        int epfd;
        int listen_sock;
        uint16_t port;

    public:
        EpollServer(int _port) : port(_port)
        {
        }
        ~EpollServer()
        {
            if (listen_sock >= 0)
            {
                close(listen_sock);
            }
            if (epfd >= 0)
            {
                close(epfd);
            }
        }

    public:
        void InitEpollServer()
        {
            listen_sock = ksy::Sock::Socket();
            ksy::Sock::Bind(listen_sock, port);
            ksy::Sock::Listen(listen_sock, back_log);
            std::cout << "dbug listen_sock: " << listen_sock << std::endl;
            if ((epfd = epoll_create(size)) < 0)
            {
                std::cerr << "epoll_creat error" << std::endl;
                exit(4);
            }
            std::cout << "epoll_create :" << epfd << std::endl;
        }
        void ADDEvent(int sock, uint32_t event)
        {
            struct epoll_event ev;
            ev.events = 0;
            ev.events |= event; //非必须的
            ev.data.fd = sock;

            if (epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev))
            {
                std::cerr << "epoll_ctl error:" << sock << std::endl;
            }
        }

        void DelEvent(int sock)
        {
            if (epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr) < 0)
            {
                std::cerr << "epoll_ctl_error,fd" << sock << std::endl;
                return;
            }
        }
        void Run()
        {
            //在这里我们目前只有一个socket能够关心读写的
            int timeout = -1;
            //    const int maxNum=64;
            struct epoll_event recvs[MAX_NUM];

            ADDEvent(listen_sock, EPOLLIN);//添加事件
            for (;;)
            {
                //返回值表明有多少个事件就绪了,内核会将就绪事件会将就绪事件一次recvs当中
                int num = epoll_wait(epfd, recvs, MAX_NUM, timeout);
                if (num > 0)
                {
                    // std::cerr<<"有事件就绪了"<<std::endl;
                    for (int i = 0; i < num; i++)
                    {
                        int Sock = recvs[i].data.fd;
                        if (recvs[i].events & EPOLLIN) //读事件就绪
                        {
                            if (listen_sock == Sock) //链接就绪
                            {
                                struct sockaddr_in peer;
                                socklen_t len = sizeof(peer);

                                int sk = accept(listen_sock, (struct sockaddr *)&peer, &len);
                                if (sk < 0)
                                {
                                    std::cerr << "accept error" << std::endl;
                                    continue;
                                }
                                //不可以
                                std::cout << "get a new link " << inet_ntoa(peer.sin_addr) << ":" << ntohs(peer.sin_port) << std::endl;
                                ADDEvent(sk, EPOLLIN); //先进行读取,只有需要写入的时候才主动设置epollout
                            }
                            else
                            {
                                //可读事件
                                char buffer[1024];
                                //需要进行读取
                                ssize_t s = recv(Sock, buffer, sizeof(buffer) - 1, 0);
                                if (s > 0)
                                {
                                    buffer[s - 1] = 0;
                                    std::cout << buffer << std::endl;
                                }
                                else
                                {
                                    std::cout << "client close" << std::endl;
                                    close(Sock);
                                }
                            }
                        }
                        else if (recvs[i].events & EPOLLOUT)
                        {
                        }

                        else
                        {
                            //?
                        }
                    }
                }
                else if (num == 0) //超时了
                {
                    std::cout << "time out" << std::endl;
                }
                else
                {
                    std::cerr << "epoll error" << std::endl;
                }
            }
        }
    };
}

3.server.cc

#include <iostream>
#include <cstdlib>
#include "epoll_sever.hpp"
static void Usage(std::string proc)
{
    std::cout << "Usage: " << proc << std::endl;
}
int main(int argc, char *argv[], char *env[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        return 5;
    }
    int port = atoi(argv[1]);
    ns_epoll::EpollServer *ep_svr = new ns_epoll::EpollServer(port);
    ep_svr->InitEpollServer();
    ep_svr->Run();
    return 0;
}

注意:

epoll有两种工作模型:LT、EL。默认是LT模式。LT,内核通知你fd是否就绪,如果没有处理,则会持续通知。而ET,内核只通知一次。这就意味着如果我们使用ET模式那么就意味着我们不能使用阻塞读取因为我们读取一次我们不能确定我们是否读取完毕,而如果我们使用循环读取当我们读取两次之后数据没有了此时我们在进行读取,那么我们的服务器就会被挂起,如果对方不放消息给我们服务器一直挂起不能接受新的链接和挂了没有什么区别,所以ET模式下我们不能进行阻塞读取,需要使用非阻塞读取。

6.epoll和select的对比:

1.接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文 件描述符, 也做到了输入输出参数分开 2.数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频 繁(而select/poll都是每次循环都要进行拷贝) 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述 符数目很多, 效率也不会受到影响. 没有数量限制: 文件描述符数目无上限.

相关文章