Linux高级IO模型

The Redefine Team Lv3

Linux 网络高级 IO

五种IO模型

本文将介绍Linux网络编程中的五种常见网络高级IO函数

  1. 阻塞IO
    1. 在内核将数据准备好之前, 系统调用会一直等待.
    2. 所有的套接字, 默认都是阻塞方式.
  2. 非阻塞IO
    1. 如果内核还未将数据准备好, 系统调用仍然会直接返回
    2. 返回EWOULDBLOCK错误码.
    3. 该IO模式常需要以循环的方式反复尝试读写文件描述符, 这个过程称为轮询
    4. 但是轮询会带来大量的CPU资源浪费,所以该IO模式一般只在特定场景下应用
  3. 信号驱动IO
    1. 内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作
  4. IO多路转接
    1. 类似于阻塞IO,
    2. 最核心在于IO多路转接能够同时等待多个文件 描述符的就绪状态
  5. 异步IO
    1. 在数据拷贝完成时, 内核会通知应用程序
    2. 信号驱动则是告诉应用程序何时可以开始拷贝数据

总结:

所有的IO均可看作等待 + 拷贝,其中等待的时间常常远高于拷贝,所以高效的IO模式都是尽量减少等待的时间(后面会具体介绍);

高级IO重要概念

同步通信 (synchronous communication) VS 异步通信 (asynchronous communication)

同步和异步关注的是消息通信机制:

  • 同步

直接由调用者进行等待,在处理完成之前,调用者会一直等待该处理结果的返回,期间调用者不会进行其他的操作;

  • 异步

调用者不会一直进行阻塞式等待,当处理完成后,被调用者会通过一系列的机制(回调函数、信号等)通知调用者,调用者收到对应的完成信号后执行后续操作;

然而,需要注意与多进程/多线程同步/互斥的区别,两者毫不相关:

  • 进程/线程同步也是进程/线程之间直接的制约关系

  • 是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、 传递信息所产生的制约关系. 尤其是在访问临界资源的时候

阻塞 VS 非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态

  • 阻塞调用是指调用结果返回之前,对应的线程会被挂起,直到收到响应信号时才会返回;
  • 非阻塞则不同于阻塞,即使未收到响应信号,线程也不会被挂起,而是继续执行接下来的操作;

其他高级IO

非阻塞IO,纪录锁,系统V流机制,I/O多路转接(也叫I/O多路复用),readv和writev函数以及存储映射 IO(mmap),这些统称为高级IO
本文重点讨论 I/O 多路转接;

非阻塞IO

fcntl

文件描述符控制函数,默认是阻塞 IO

函数原型

1
2
3
4
#include <iostream>
#include <fcntl.h>

int fcntl(int fd, int cmd, ... /* arg */);

参数说明

cmd

F_DUPFD: 复制一个现有的描述符

F_GETFD / F_SETFD: 获得/设置文件描述符标记

F_GETFL / F_SETFL: 获得/设置文件状态标记

F_GETOWN / F_SETOWN: 获得/设置异步I/O所有权

F_GETLK / F_SETLK / F_SETLKW: 获得/设置记录锁

用法实例

——以设置文件描述符为非阻塞模式为例

1
2
3
4
5
6
7
8
9
10
void SetNoBlock(int fd){
// 获取当前文件描述符模式
int flag = fcntl(fd, F_GETFL);
if(flag < 0){
std::cerr << "ERROR FD" << std::endl;
return;
}
// 设置文件描述符为非阻塞模式
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}

细节

这里获取的flag,请注意它是一个位图结构,关于位图结构,可以粗略的将它看作是一个_比特位容器_,每个位置上的0/1值都代表着不同的含义,当然位图的实现依据不同的功能有不同的实现方案,肯定不会只是一个简单的int类型可表示的,这里只是用作理解;

以轮询方式读取标准输入

在我们完成了上述的非阻塞模式设置函数后,我们就可以基于此实现以轮询方式读取标准输入的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>

void SetNoBlock(int fd){
// 获取当前文件描述符模式
int flag = fcntl(fd, F_GETFL);
if(flag < 0){
std::cerr << "ERROR FD" << std::endl;
return;
}
// 设置文件描述符为非阻塞模式
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}

int main()
{
// 标准输入的文件描述符默认是0
// 1 -> 标准输出
// 2 -> 标准错误
SetNoBlock(0);
// 死循环模拟轮询状态
for(;;)
{
char buffer[1024];
// 读取
// 以非阻塞方式
ssize_t s = read(0, buffer, sizeof(buffer) - 1);
if(s < 0){
std::cerr << "ERROR READ" << std::endl;
sleep(1);
continue;
}
std::cout << "GetInput # " << buffer << std::endl;
}
return 0;
}

I/O多路转接

什么是多路转接:通俗的理解就是多个信号或数据流共享一条通信管道,通过各种机制(如信号)实现数据的有序传递,下面分别介绍几种常见的通信机制:

Select

初识select

系统提供select函数来实现多路复用输入/输出模型

  • select系统调用是用来让我们的程序监视多个文件描述符的状态变化的;

  • 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

select 函数原型

1
2
3
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

参数解释

  • nfds:最大的可用文件描述符 + 1;
  • readfds:监听读事件的文件描述符的集合,这里的 fd_set 即表示一个文件描述符集合;
  • writefds:同上,表示被监听的写事件的文件描述符集合;
  • exceptfds:同上,表示被监听的异常事件的文件描述符集合;
  • timeout:用来设置 select 的超时时间(用来区分非阻塞和阻塞模式)

timeout 取值

  • NULL:则表示 select 没有timeout,select将一直被阻塞,直到某个文件描述符上发生了事件; ——阻塞等待
  • 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生; ——非阻塞等待
  • 特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回; ——超时时间内阻塞,超时时间外非阻塞

fd_set

位图结构,对应的位标识所监视的文件描述符;

操作函数(位图结构不是简单类型,不可以通过位运算进行直接操作)

  • void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
  • int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd的位是否为真
  • void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
  • void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位

返回值

  • 执行成功则返回文件描述词状态已改变的个数
  • 如果返回0代表在描述词状态改变前已超过timeout时间,没有返回
  • 当有错误发生时则返回-1,错误原因存于errno,此时参数 readfds, writefds, exceptfds 和 timeout 的值变成不可预测;

常见错误值

  • EBADF 文件描述词为无效的或该文件已关闭
  • EINTR 此调用被信号所中断
  • EINVAL 参数 n 为负值
  • ENOMEM 核心内存不足

select 函数执行过程

理解select函数执行过程,需要与其操作函数相结合,这里以一个字节(8位)位图为例:

  1. 首先重置位图结构——FD_ZERO,位图变为 (0000 0000);
  2. 再设置需要监听的文件描述符,这里以 fd = 4为例——FD_SET(4, &fd_set);(0001 0000)
  3. 当监听到对应的文件描述符上由事件就绪时(事件类型由对应的文件描述符集合确定,这里是阻塞等待),fd_set被重新设置,对应位上的值被置为1,而监听的文件描述符若未就绪,则会被置为0;(0001 0000)
  4. 接收端读取select返回的信号,检索fd_set,执行相应操作;

select 就绪条件

读就绪

  • 套接字 (Sockets)

    • 监听套接字 (Listening Socket, e.g., created by socket, bind, listen) 如果有新的连接请求已完成(pending connection),accept() 将不会阻塞。

    • 已连接的流套接字 (Connected Stream Socket, e.g., TCP)

      • 套接字接收缓冲区中的数据字节数大于等于套接字接收缓冲区低水位标记(SO_RCVLOWAT,默认为1)。这意味着调用 read() 或 recv() 将不会阻塞,并且会返回大于 0 的值(读取到的数据)。

        低水平标记

        1. 接收低水位标记 (Receive Low Water Mark, SO_RCVLOWAT)
        • 定义: 当套接字接收缓冲区中累积的数据字节数达到或超过这个标记值时,select/poll/epoll 才会通知应用程序该套接字“可读”。
        • 默认值: 通常为1字节。
        1. 发送低水位标记 (Send Low Water Mark, SO_SNDLOWAT)
        • 定义: 当套接字发送缓冲区中的可用空间字节数达到或超过这个标记值时,select/poll/epoll 才会通知应用程序该套接字“可写”。
        • 默认值: 通常也比较小,例如允许写入至少一个字节。

        为什么需要低水位标记?它的作用是什么?

        1. 减少不必要的唤醒和系统调用 (针对SO_RCVLOWAT):
          • 场景: 想象一个应用,如果每当接收缓冲区有一个字节到达时就被唤醒,它就需要执行一次 read() 系统调用。如果数据是零散到达的(比如,对端一次只发送几个字节),那么应用程序会被频繁唤醒,每次唤醒只处理少量数据。这会导致大量的上下文切换和系统调用开销,效率低下。
          • 作用: 通过设置一个较高的 SO_RCVLOWAT(例如,设置为1024字节),应用程序只有在接收缓冲区中累积了足够多的数据(至少1024字节)时才会被通知。这样,一次 read() 调用可以读取更多的数据,减少了唤醒次数和系统调用次数,提高了整体吞吐量和效率。
          • 权衡: 设置过高的 SO_RCVLOWAT 可能会增加延迟,因为应用程序需要等待更多数据到达才会被唤醒。
        2. 控制写入的粒度 (针对SO_SNDLOWAT):
          • 场景: 类似地,如果发送缓冲区只有少量空间可用时就通知应用程序可写,应用程序可能会尝试写入少量数据,导致频繁的 write() 调用和可能的网络小包问题(Nagle算法可能会缓解,但不是所有情况)。
          • 作用: 通过设置一个合适的 SO_SNDLOWAT,可以确保在发送缓冲区有足够空间容纳一个有意义的数据块时才通知应用程序可写。这有助于应用程序进行更有效率的批量写入。
          • 权衡: 对于某些需要低延迟的应用,即使只有少量空间也希望尽快发送数据,此时较小的 SO_SNDLOWAT(甚至默认值)可能更合适。
        3. 避免“忙等”或过于频繁的轮询:
          • 如果没有低水位标记(或者说标记总是1),在数据流速较慢或不稳定的情况下,select/poll/epoll 可能会在只有极少量数据可操作时就返回。如果应用程序逻辑是“只要可读/可写就一直读/写”,可能会导致非常频繁地进入和退出内核态,即使每次操作的数据量很小。
          • 低水位标记提供了一种机制,让内核在条件“更有意义”时才通知用户。
        4. 配合应用层协议:
          • 某些应用层协议有最小消息大小或期望一次处理的数据块大小。通过设置合适的低水位标记,可以使得内核通知的时机与应用层的数据处理逻辑更匹配。例如,如果应用总是期望处理至少一个完整的协议单元,可以将 SO_RCVLOWAT 设置为该协议单元的典型大小。
      • 连接的读取半部已经关闭(例如,对端发送了 FIN)。此时调用 read() 或 recv() 将不会阻塞,而是返回 0(表示 EOF)。

      • 套接字上发生了错误(error pending)。此时调用 read() 或 recv() 将不会阻塞,而是返回 -1 并设置 errno 为相应的错误码(例如 ECONNRESET)。这些错误也可以通过 getsockopt(SO_ERROR) 来获取。

    • 数据报套接字 (Datagram Socket, e.g., UDP) 有一个待处理的数据报可供读取。recvfrom() 将不会阻塞。

  • 管道 (Pipes) 或 FIFO

    • 管道中有数据可读。read() 将不会阻塞。
    • 管道的写端已经关闭。read() 将不会阻塞,而是返回 0 (EOF)。
  • 终端设备 (Terminals) 有未读的数据。

  • 常规文件 (Regular Files) 通常总是被认为是可读的(除非指针在文件末尾)。但 select 主要用于网络和管道这类可能阻塞的 I/O。

写就绪

一个文件描述符被认为是可写的,如果以下任一条件成立:

  • 套接字 (Sockets):
    • 已连接的流套接字 (Connected Stream Socket, e.g., TCP):
      • 套接字发送缓冲区中的可用空间大于等于套接字发送缓冲区低水位标记(SO_SNDLOWAT,默认为1),并且套接字已连接(或者不需要连接,如 UDP)。这意味着调用 write() 或 send() 通常不会阻塞(或者阻塞时间会很短,不会超过发送超时)。
      • 连接的写入半部已经关闭。此时调用 write() 或 send() 通常会产生 SIGPIPE 信号,或者返回 -1 并设置 errno 为 EPIPE。注意:即使写入会失败,select 也可能报告其可写,所以需要处理写入错误。
      • 一个非阻塞的 connect() 调用已经成功完成。
      • 套接字上发生了错误(error pending)。此时调用 write() 或 send() 将不会阻塞,而是返回 -1 并设置 errno。这些错误也可以通过 getsockopt(SO_ERROR)来获取。
    • 数据报套接字 (Datagram Socket, e.g., UDP): 通常总是可写的,因为 UDP 是无连接的,发送操作通常不会阻塞,除非内核缓冲区暂时满了。
  • 管道 (Pipes) 或 FIFO:
    • 管道中有足够的空间可供写入数据(至少一个字节)。write() 将不会阻塞。
    • 管道的读端已经关闭。write() 将会产生 SIGPIPE 信号,或者返回 -1 并设置 errno 为 EPIPE。
  • 终端设备 (Terminals): 可以写入。
  • 常规文件 (Regular Files): 通常总是被认为是可写的(除非磁盘空间已满或超出配额)。

异常就绪

一个文件描述符被认为有异常条件,如果:

  • 套接字 (Sockets):
    • 接收到带外数据 (Out-Of-Band, OOB data),仅适用于支持 OOB 数据的协议(如 TCP)。
    • 在某些情况下,当套接字上存在待处理的错误时,也可能在 exceptfds 中报告。但更常见的是通过可读或可写条件来发现错误(即读写操作返回-1)。
  • 伪终端 (Pseudo-terminals): 在包模式 (packet mode) 下,当主控端检测到从属端的状态改变时。

select 的特点

  • 可监控的文件描述符取决于sizeof(fd_set)的大小,以我自己的系统为例,sizeof(fd_set)大小为128 bytes,而每一位都可以标识一个文件描述符,所以可监控的文件描述符为128 * 8 = 1024

    fd_set 的大小可以调整,具体方法可能涉及重新编译内核;

  • 将 fd 加入监控集的同时,还要再使用一个数据结构 array 保存到 select 监控集合中的 fd

    • 一是用于再select 返回后,array作为源数据和fd_set进行FD_ISSET判断;
    • 二是select返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得 fd 逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数;

select 的缺点

  • 每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便;
  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大;

    用户无法对内核数据进行直接操作,所有的操作都是操作系统通过对应的文件描述符对对应的资源进行操作;

  • 每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大;
  • select支持的文件描述符数量太小

select 的使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h> // for close, read, write, STDIN_FILENO
#include <cstring> // for memset, strerror
#include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUCCESS
#include <vector> // 用于存储客户端套接字
#include <algorithm> // for std::max

// 常量定义
const int BUFFER_SIZE = 1024; // 缓冲区大小
const int DEFAULT_PORT = 8889; // 默认端口

class SelectServer
{
private:
int _listenFd; // 监听套接字
fd_set _masterFds; // 主文件描述符集合, 保存所有需要监视的fd
fd_set _readFds; // 临时的文件描述符集合, 用于select调用
int _maxFd; // 当前监视的最大文件描述符值

// --- 套接字辅助函数 ---
static int Socket()
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0)
{
std::cerr << "socket 创建失败: " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Socket fd 创建成功: " << fd << std::endl;
// 设置端口复用, 以便服务器快速重启
int opt = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
std::cerr << "setsockopt(SO_REUSEADDR) 失败: " << strerror(errno) << std::endl;
// 这里不退出, 但记录错误
}
return fd;
}

static void Bind(int fd, int port)
{
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port); // 将主机字节序转换为网络字节序 (Port)
addr.sin_addr.s_addr = INADDR_ANY; // 监听任意网络接口
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
std::cerr << "bind 错误, 端口 " << port << ": " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Bind 成功, fd: " << fd << " 到端口 " << port << std::endl;
}

static void Listen(int fd, int backlog = 5) // backlog 是等待连接队列的最大长度
{
if (listen(fd, backlog) < 0)
{
std::cerr << "listen 错误: " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Listen 成功, fd: " << fd << std::endl;
}

static int Accept(int sock, std::string& clientIp, uint16_t& clientPort)
{
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientFd = accept(sock, (struct sockaddr*)&client_addr, &len);
if (clientFd < 0)
{
// 对于阻塞的accept, 这里通常是真正的错误
if (errno != EAGAIN && errno != EWOULDBLOCK) { // 如果是非阻塞模式, EAGAIN/EWOULDBLOCK表示没有连接
std::cerr << "accept 错误: " << strerror(errno) << std::endl;
}
return -1; // 表示错误或没有连接
}
clientIp = inet_ntoa(client_addr.sin_addr); // 将网络字节序的IP转换为点分十进制字符串
clientPort = ntohs(client_addr.sin_port); // 将网络字节序的端口转换为主机字节序
std::cout << "接受新连接。客户端 fd: " << clientFd
<< " 来自 " << clientIp << ":" << clientPort << std::endl;
return clientFd;
}

// --- Select 特定辅助函数 ---
void AddClientFdToMasterSet(int clientFd) {
if (clientFd < 0) return;

FD_SET(clientFd, &_masterFds); // 将新的客户端fd添加到主集合中
if (clientFd > _maxFd) {
_maxFd = clientFd; // 更新最大文件描述符
}
std::cout << "客户端 fd " << clientFd << " 已添加到 select 监视集合。" << std::endl;
}

void RemoveClientFdFromMasterSet(int clientFd) {
if (clientFd < 0) return;

std::cout << "客户端 fd " << clientFd << " 断开连接或出错, 正在关闭并从监视集合中移除。" << std::endl;
close(clientFd);
FD_CLR(clientFd, &_masterFds); // 从主集合中移除

// 如果移除的是最大的fd, 需要重新计算_maxFd
// (一个简单的做法是遍历, 但对于频繁移除可能效率不高。
// 在这个演示中, 为了简单, 我们可以在下次有新连接时更新, 或者保持它不变直到下一个更高的fd加入。
// 更健壮的做法是遍历_masterFds找到新的最大值)
if (clientFd == _maxFd) {
// 简单的重新计算 _maxFd (效率不高, 但对于演示足够)
_maxFd = _listenFd; // 先设为监听fd
for (int i = 0; i <= clientFd; ++i) { // 之前_maxFd是clientFd, 现在要找小于它的
if (FD_ISSET(i, &_masterFds) && i > _maxFd) {
_maxFd = i;
}
}
// 或者, 可以更懒惰一点, 在下一个add时自然更新, 只要保证select的第一个参数够大
}
}

void HandleNewConnection() {
std::string clientIp;
uint16_t clientPort;
int clientFd = Accept(_listenFd, clientIp, clientPort);
if (clientFd > 0) {
AddClientFdToMasterSet(clientFd);
}
}

void HandleClientData(int clientFd) {
char buffer[BUFFER_SIZE];
ssize_t bytes_read = recv(clientFd, buffer, BUFFER_SIZE - 1, 0);

if (bytes_read > 0) {
buffer[bytes_read] = '\0'; // 添加字符串结束符
std::cout << "从 fd " << clientFd << " 收到: " << buffer;
// 简单的回显服务器: 将数据发回给客户端
send(clientFd, buffer, bytes_read, 0);
} else if (bytes_read == 0) {
// 客户端关闭了连接
RemoveClientFdFromMasterSet(clientFd);
} else {
// recv 发生错误
if (errno != EAGAIN && errno != EWOULDBLOCK) { // EAGAIN 针对非阻塞套接字
std::cerr << "fd " << clientFd << " 上的 recv 错误: " << strerror(errno) << std::endl;
RemoveClientFdFromMasterSet(clientFd);
}
}
}

public:
SelectServer(int port) : _listenFd(-1), _maxFd(-1) {
_listenFd = Socket();
Bind(_listenFd, port);
Listen(_listenFd);

FD_ZERO(&_masterFds); // 清空主文件描述符集合
FD_ZERO(&_readFds); // 清空临时读文件描述符集合

// 将监听套接字添加到主集合中
FD_SET(_listenFd, &_masterFds);
_maxFd = _listenFd; // 初始化最大文件描述符

std::cout << "SelectServer 初始化完成。正在监听 fd " << _listenFd << std::endl;
}

~SelectServer() {
if (_listenFd != -1) {
std::cout << "关闭监听 fd: " << _listenFd << std::endl;
close(_listenFd);
}
// 关闭所有仍然在_masterFds中的客户端连接
for (int i = 0; i <= _maxFd; ++i) {
if (FD_ISSET(i, &_masterFds) && i != _listenFd) {
std::cout << "关闭客户端 fd: " << i << std::endl;
close(i);
}
}
std::cout << "SelectServer 关闭。" << std::endl;
}

void Start() {
std::cout << "服务器启动 select 循环..." << std::endl;
while (true) {
_readFds = _masterFds; // 每次循环前, 从主集合复制到临时集合, 因为select会修改它

// select的第一个参数是 nfds, 它应该是当前监视的最大文件描述符值 + 1
// timeout 参数设为 NULL 表示永久阻塞直到有事件发生
int nready = select(_maxFd + 1, &_readFds, NULL, NULL, NULL);

if (nready < 0) {
if (errno == EINTR) continue; // 被信号中断, 重新select
std::cerr << "select 错误: " << strerror(errno) << std::endl;
break; // 严重错误, 退出循环
}

if (nready == 0) {
// 如果设置了超时且超时发生, 会到这里。
// 但由于我们超时设为NULL, 理论上不应该到这里, 除非select行为异常。
// std::cout << "Select 超时。" << std::endl;
continue;
}

// 检查哪些文件描述符就绪了
// 首先检查监听套接字是否有新连接
if (FD_ISSET(_listenFd, &_readFds)) {
HandleNewConnection();
if (--nready <= 0) continue; // 优化: 如果所有就绪事件都已处理
}

// 然后检查所有客户端套接字是否有数据可读
// 依次遍历事件监听位图
// 较大的事件复杂度
// 注意: 从0开始遍历到_maxFd, 但要跳过_listenFd, 因为它已经处理过了
// 或者, 可以维护一个客户端fd列表, 只遍历那个列表
for (int i = 0; i <= _maxFd; ++i) {
if (FD_ISSET(i, &_readFds)) {
if (i == _listenFd) {
// 监听套接字已经处理过了, 跳过
continue;
} else {
// 是一个客户端套接字有数据
HandleClientData(i);
if (--nready <= 0) break; // 优化: 如果所有就绪事件都已处理
}
}
}
}
}
};


int main(int argc, char *argv[]) {
int port = DEFAULT_PORT;
if (argc > 1) {
port = atoi(argv[1]);
if (port <= 0 || port > 65535) {
std::cerr << "无效的端口号: " << argv[1] << std::endl;
port = DEFAULT_PORT;
}
}

SelectServer server(port);
server.Start();

return EXIT_SUCCESS;
}

该程序只检测标准输入(对应的文件描述符为0),当一直不输入时,就会产生超时信息;

poll

鉴于select带来的较大的拷贝开销和遍历成本,又提出了一种新的多路转接方式——poll

函数原型

1
2
3
4
5
6
7
8
9
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

// pollfd结构
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};

参数说明

  • fds:指向pollfd结构体的指针

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // pollfd结构
    struct pollfd {
    int fd; // 当前结构体所监视的文件描述符
    short events; // 当前文件描述符锁关心的时间,具体类型有:
    /*
    POLLIN:表示文件描述符可以进行读取操作(即有数据可读)。
    POLLOUT:表示文件描述符可以进行写入操作(即可以写数据)。
    POLLERR:表示文件描述符发生错误。
    POLLHUP:表示文件描述符被挂起,通常是连接关闭。
    POLLNVAL:表示文件描述符无效。
    */
    short revents; // 返回的事件类型,poll 返回时会修改这个字段,告知哪些事件已经发生
    // 返回值是 events 字段中感兴趣的事件,或者是一些错误事件
    };
  • nfds:表示当前所监视的文件描述符个数,即结构体指针指向的结构体个数

  • timout:这是 poll 等待事件发生的最大时间,单位是毫秒,timeout 的值可以是以下几种:

    • 大于 0:表示等待事件发生的最长时间(毫秒)。poll 会在超时之前返回,或者在事件发生时返回。
    • 0:表示非阻塞模式,poll 不会阻塞,立即返回。如果没有事件发生,则 revents 字段会被设置为 0。
    • -1:表示无限期等待,poll 将会一直阻塞直到某个事件发生。

返回值

  • 返回值小于0, 表示出错;
  • 返回值等于0, 表示poll函数等待超时;
  • 返回值大于0, 表示poll由于监听的文件描述符就绪而返回

socket就绪条件

select相同;

poll 的优点

select来说,省略了三套位图结构,而以一个结构体指针类型代替,优化了数据存储的结构

  1. 首先针对结构体提供了同一的操作接口,而不是select在循环中进行赋值操作;
  2. select的位图数组(大小由fd_set限制,而该限制是硬限制,可以通过修改系统设置来更改)不同,poll依赖于结构体数组实现,所以理论上poll没有最大数量的限制;

poll 的缺点

  1. 尽管poll通过结构体指针实现了操作的统一,但是之后的查询就绪状态依然需要执行遍历操作,尤其是当监视的文件描述符数量较大时,会带来较大的性能消耗;
  2. 内部使用数组维护,动态扩展和删除性能较差,系统需要重新构建一个poll_fd结构体数组,由此会带来较大的拷贝资源消耗;

poll 的使用实例(检测标准输入输出——ReadEvent && ListenEvent)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
#include <iostream>
#include <poll.h>
#include <cstring> // For memset, strerror
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h> // For close, read, write
#include <cstdlib> // For exit, EXIT_FAILURE, EXIT_SUCCESS
#include <vector> // For a more C++ idiomatic way if not restricted
#include <algorithm> // For std::remove_if with vector

// Constants
const int MAX_FDS = 1024; // Maximum number of file descriptors to monitor
const int BUFFER_SIZE = 1024; // Buffer for reading data
const int DEFAULT_PORT = 8888;

class PollServer
{
private:
int _listenFd;
struct pollfd _fds[MAX_FDS]; // Array of pollfd structures
int _numCurrentFds; // Number of actual FDs being monitored (optimization for poll)
// Or, more simply, always poll MAX_FDS and let poll ignore fd = -1

// --- Socket helper functions (static as in original) ---
static int Socket()
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0)
{
std::cerr << "socket error: " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Socket fd created: " << fd << std::endl;
// Set port reuse
int opt = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
std::cerr << "setsockopt(SO_REUSEADDR) failed: " << strerror(errno) << std::endl;
// Not exiting, but good to know
}
return fd;
}

static void Bind(int fd, int port)
{
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
std::cerr << "bind error on port " << port << ": " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Bind successful on fd: " << fd << " to port " << port << std::endl;
}

static void Listen(int fd, int backlog = 5) // Added backlog parameter with default
{
if (listen(fd, backlog) < 0)
{
std::cerr << "listen error: " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Listen successful on fd: " << fd << std::endl;
}

static int Accept(int sock, std::string& clientIp, uint16_t& clientPort)
{
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientFd = accept(sock, (struct sockaddr*)&client_addr, &len);
if (clientFd < 0)
{
// EAGAIN or EWOULDBLOCK means no pending connections (if non-blocking accept)
// For a blocking accept, this is a real error.
// For poll, accept is typically called only when listen_fd is readable.
if (errno != EAGAIN && errno != EWOULDBLOCK) {
std::cerr << "accept error: " << strerror(errno) << std::endl;
}
return -1; // Indicate error or no connection
}
clientIp = inet_ntoa(client_addr.sin_addr);
clientPort = ntohs(client_addr.sin_port);
std::cout << "Accepted new connection. Client fd: " << clientFd
<< " from " << clientIp << ":" << clientPort << std::endl;
return clientFd;
}

// --- Poll specific helpers ---
void AddClientFd(int clientFd) {
if (clientFd < 0) return;

for (int i = 1; i < MAX_FDS; ++i) { // Start from 1 because 0 is _listenFd
if (_fds[i].fd == -1) {
_fds[i].fd = clientFd;
_fds[i].events = POLLIN; // Monitor for readability
_fds[i].revents = 0;
std::cout << "Added client fd " << clientFd << " to poll set at index " << i << std::endl;
// _numCurrentFds can be used to optimize the second argument of poll
// if we compact the array or track the highest index.
// For simplicity, we'll poll all MAX_FDS entries.
return;
}
}
std::cerr << "Too many clients. Cannot add fd: " << clientFd << std::endl;
close(clientFd); // Close if we can't handle it
}

void RemoveClientFd(int index) {
if (index < 0 || index >= MAX_FDS || _fds[index].fd == -1) return;

std::cout << "Client fd " << _fds[index].fd << " at index " << index << " disconnected or error. Closing." << std::endl;
close(_fds[index].fd);
_fds[index].fd = -1;
_fds[index].events = 0;
_fds[index].revents = 0;
// To optimize poll's second argument, you might compact the array here
// or adjust _numCurrentFds if you are tracking the max index.
}

void HandleNewConnection() {
std::string clientIp;
uint16_t clientPort;
int clientFd = Accept(_listenFd, clientIp, clientPort);
if (clientFd > 0) {
AddClientFd(clientFd);
}
}

void HandleClientData(int index) {
char buffer[BUFFER_SIZE];
ssize_t bytes_read = recv(_fds[index].fd, buffer, BUFFER_SIZE -1, 0);

if (bytes_read > 0) {
buffer[bytes_read] = '\0'; // Null-terminate
std::cout << "Received from fd " << _fds[index].fd << ": " << buffer;
// Simple echo server: send data back
send(_fds[index].fd, buffer, bytes_read, 0);
} else if (bytes_read == 0) {
// Connection closed by client
RemoveClientFd(index);
} else {
// Error on recv
if (errno != EAGAIN && errno != EWOULDBLOCK) { // EAGAIN for non-blocking
std::cerr << "recv error on fd " << _fds[index].fd << ": " << strerror(errno) << std::endl;
RemoveClientFd(index);
}
}
}


public:
PollServer(int port) : _listenFd(-1), _numCurrentFds(0) {
_listenFd = Socket();
Bind(_listenFd, port);
Listen(_listenFd);

// Initialize pollfd array
for (int i = 0; i < MAX_FDS; ++i) {
_fds[i].fd = -1; // -1 indicates an unused entry
_fds[i].events = 0;
_fds[i].revents = 0;
}

// Add listen_fd to the set
_fds[0].fd = _listenFd;
_fds[0].events = POLLIN; // Monitor for incoming connections
// _numCurrentFds = 1; // if tracking max index for poll's 2nd arg
std::cout << "PollServer initialized. Listening on fd " << _listenFd << std::endl;
}

~PollServer() {
if (_listenFd != -1) {
std::cout << "Closing listen fd: " << _listenFd << std::endl;
close(_listenFd);
}
for (int i = 1; i < MAX_FDS; ++i) { // Start from 1, 0 is listenFd
if (_fds[i].fd != -1) {
std::cout << "Closing client fd: " << _fds[i].fd << std::endl;
close(_fds[i].fd);
}
}
std::cout << "PollServer shut down." << std::endl;
}

void Start() {
std::cout << "Server starting poll loop..." << std::endl;
// 无限循环
// 直到连接断开
while (true) {
// The second argument to poll is the number of items in the fds array.
// If you optimize by compacting or tracking max index, this could be _numCurrentFds.
// For simplicity, we pass MAX_FDS; poll ignores entries where fd is -1.
int nready = poll(_fds, MAX_FDS, -1); // -1 for infinite timeout

if (nready < 0) {
if (errno == EINTR) continue; // Interrupted by signal, try again
std::cerr << "poll error: " << strerror(errno) << std::endl;
break; // Critical error
}

if (nready == 0) {
// Timeout occurred (if timeout was not -1)
// std::cout << "Poll timeout." << std::endl;
continue;
}

// Check listen_fd first (it's at index 0)
if (_fds[0].revents & POLLIN) {
HandleNewConnection();
if (--nready <= 0) continue; // Optimization: if all handled
}

// Check client fds
// 遍历式查找
// 较高的时间复杂度, 特别是监听的fd数量很大时
// poll的缺点之一
for (int i = 1; i < MAX_FDS; ++i) {
if (_fds[i].fd == -1) continue; // Skip unused entries

if (_fds[i].revents & (POLLIN | POLLERR | POLLHUP)) {
if (_fds[i].revents & POLLIN) {
HandleClientData(i);
}
// POLLERR or POLLHUP might also be set along with POLLIN if error occurred after data arrived
// or if client closed write end (POLLIN for FIN) then fully closed (POLLHUP)
if (_fds[i].revents & (POLLERR | POLLHUP)) {
if (!(_fds[i].revents & POLLIN)) { // If not already handled by HandleClientData reading 0
std::cerr << "Error/Hangup on fd " << _fds[i].fd << ". Events: " << _fds[i].revents << std::endl;
RemoveClientFd(i);
}
}
if (--nready <= 0) break; // Optimization: all handled events processed
}
}
}
}
};


int main(int argc, char *argv[]) {
int port = DEFAULT_PORT;
if (argc > 1) {
port = atoi(argv[1]);
if (port <= 0 || port > 65535) {
std::cerr << "Invalid port number: " << argv[1] << std::endl;
port = DEFAULT_PORT;
}
}

PollServer server(port);
server.Start();

return EXIT_SUCCESS;
}

epoll

——Linux 2.6 版本下公认的性能最好的多路 I/O 就绪通知方法

epoll 相关系统调用

epoll_create

创建一个epoll句柄;

函数原型
1
2
#include <sys/epoll.h>
int epoll_create(int size);
参数说明
  • size:
  • Linux 2.6.8 之前,size 参数用于指定内核为 epoll 实例分配的事件队列的大小。具体来说,它表示内核分配的事件数组的初始大小,即内核为该 epoll 实例保留的空间大小(以事件数量为单位)。如果事件的数量超过这个初始大小,内核会动态地扩展空间。

  • Linux 2.6.8 之后,size参数的作用被弃用了;而内核根据实际需求来分配资源;

返回值
  • 成功:返回一个非负整数,表示创建的 epoll 实例的文件描述符。

  • 失败:如果调用失败,返回 -1,并且设置 errno 以指示错误原因。

epoll_ctl

epoll的事件注册函数;

函数原型
1
2
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *_Nullable event);
参数说明

epfd:epoll_create 的返回值,即 epoll 的句柄;

什么是句柄?

  • 句柄是对资源的抽象引用,用来间接操作资源而不暴露资源的内部细节。

  • 它通常由操作系统或库分配,并通过特定的 API 来进行资源的管理和访问。

  • 句柄的常见应用包括文件、窗口、数据库连接和图形对象等。

  • 句柄和指针的区别在于,指针直接访问内存,而句柄是资源的抽象标识符,底层实现和资源管理由操作系统或库负责。

op:表示具体的操作,具体分为一下三个宏:

  • EPOLL_CTL_ADD:注册新的fd到epfd中

  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件

  • EPOLL_CTL_DEL:从epfd中删除一个fd;

  • fd:表示需要监听的文件描述符;

  • event:表示内核需要监听的具体事件;

struct epoll_event 参数说明

其具体的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;

struct epoll_event
{
uint32_t events; /* Epoll Event */
epoll_data_t data; /* User data variable */
}__EPOLL_PACKED;

events可以是下列宏的集合(因为是位图结构,所以支持位运算)

  • EPOLLIN:表示对应的文件描述符可读(包括对端SOCKET正常关闭)

  • EPOLLOUT:表示对应的文件描述符可写

  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)

  • EPOLLERR:表示对应的文件描述符发生错误

  • EPOLLHUP:表示对应的文件描述符被挂断

  • EPOLLET :将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;

  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里;

边缘触发 && 水平触发

  • 边缘触发

    • 解释

      ​ 边缘触发是指当事件从未触发状态变为触发状态时,操作系统会通知应用程序一次。如果事件从未触发状态变为触发状态后,直到事件被清除(处理完成)之前,操作系统不会再通知应用程序。

      ​ 也就是说,只有事件的“变化”才会触发通知。

    • 特点

      • 一次性通知:只有当事件的状态从非触发变为触发时,才会发出通知,之后事件不会再重复通知,除非应用程序明确地清除事件的状态(比如读取数据)。
      • 需要不断轮询:如果应用程序没有及时处理事件(例如,未读取网络数据),则需要不断地进行轮询或等待下一次事件通知。
    • 应用场景

      ​ 边缘触发适用于对事件的处理非常迅速、并且应用程序能够及时清除事件状态的场景。

  • 水平触发

    • 解释

      ​ 水平触发是指,当事件处于触发状态时,操作系统会持续地通知应用程序,直到事件被处理完成并恢复到非触发状态。与边缘触发不同,水平触发会持续发出通知,直到应用程序处理了事件。

    • 特点

      • 持续通知:只要事件的状态仍然是触发状态,操作系统就会持续通知应用程序。应用程序必须处理事件(例如读取数据)才能清除触发状态。
      • 不需要不断轮询:应用程序只需等待事件并处理它,而不需要担心遗漏重复的事件通知。
    • 应用场景

      ​ 水平触发适用于事件状态可能持续存在的场景,且应用程序需要持续收到通知直到事件被处理的情况。

epoll_wait

收集在epoll监控的事件中已经发送的事件;

函数原型
1
2
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数说明
  • epfd:

    epoll的文件描述符,表示一个epoll实例,内部维护了多个文件描述符遗迹这些描述符的相关事件;

  • events:

    一个指向epoll_event结构体数组的指针,用于存放epoll_wait返回的就绪事件;在调用 epoll_wait 时,系统会将就绪的事件写入这个数组。应用程序需要遍历这个数组,并处理每个就绪事件;

    struct epoll_event的定义

    1
    2
    3
    4
    struct epoll_event {
    uint32_t events; // 事件类型(例如,EPOLLIN, EPOLLOUT等)
    epoll_data_t data; // 用户数据(通常用于存储文件描述符的相关信息)
    };
    • events:表示文件描述符的状态,epoll_wait 返回的事件类型,可能的值包括:
      • EPOLLIN:表示文件描述符可读。
      • EPOLLOUT:表示文件描述符可写。
      • EPOLLERR:表示文件描述符发生错误。
      • EPOLLHUP:表示文件描述符挂起(例如,连接被断开)。
      • 其他事件,如 EPOLLRDHUP(TCP连接关闭通知)等。
    • data:用户定义的数据,通常用于存储与文件描述符相关的信息。通过它可以区分不同的文件描述符或关联的业务逻辑。例如,存储一个指向自定义结构的指针,或者直接存储文件描述符本身。
  • maxevents

    这是 events 数组的大小,表示最多可以返回多少个就绪事件。epoll_wait 会返回最多 maxevents 个事件,这个数量不超过数组的大小;

  • timeout

    指定 epoll_wait 等待事件的最大时间,单位为毫秒。该参数控制 epoll_wait 的等待行为;

    • timeout = -1:表示无限等待,直到有一个或多个事件就绪为止。epoll_wait 会阻塞直到有文件描述符发生变化(例如,变为可读、可写,或者发生错误等)。
    • timeout = 0:表示非阻塞模式,epoll_wait 会立即返回,不会等待。它会检查所有注册的文件描述符的状态,如果没有就绪事件,会返回 0,表示当前没有文件描述符就绪。
    • timeout > 0:表示最多等待 timeout 毫秒,如果在指定时间内有就绪事件,epoll_wait 会返回;如果没有就绪事件,则在超时后返回 0

    常见应用场景

    • timeout = -1:常用于需要等待直到事件发生的场景,如网络服务器等待连接或数据到来。
    • timeout = 0:常用于非阻塞模式,通常在多线程或多任务的环境中,用来检查文件描述符的状态而不阻塞。
    • timeout > 0:适用于需要定时轮询事件的场景,例如,应用程序需要定期检查文件描述符的状态,并在超时后执行一些其他任务。
返回值
  • 成功:返回就绪的事件数量,表示有多少个文件描述符的事件发生了。这个数量可以小于或等于 maxevents,如果没有事件发生,则返回 0。如果有事件发生但返回的数量小于 maxevents,表示没有更多的就绪事件,程序可以继续处理其他任务。

  • 失败:返回 -1,并设置 errno 来指示错误。常见的错误码包括:

    • EINTR:系统调用被信号中断,需要重新调用 epoll_wait
    • EBADFepfd 不是一个有效的文件描述符。
    • EINVAL:无效的参数,可能是 events 为 NULL 或 maxevents 非法等。

epoll完整代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#include <iostream>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <cstring> // for memset, strerror
#include <cstdlib> // for exit, atoi
#include <vector>

// 常量定义
const int MAX_EVENTS = 10; // epoll_wait 一次最多处理的事件数
const int BUFFER_SIZE = 1024; // 读写缓冲区大小
const int DEFAULT_PORT = 8080; // 默认端口

// 设置文件描述符为非阻塞模式
int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
std::cerr << "fcntl(F_GETFL) 失败: " << strerror(errno) << std::endl;
return -1;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
std::cerr << "fcntl(F_SETFL) 失败: " << strerror(errno) << std::endl;
return -1;
}
return 0;
}

// Epoll 服务器类
class EpollServer {
private:
int _listenFd; // 监听套接字
int _epollFd; // epoll 实例的文件描述符

// --- 套接字辅助函数 ---
static int Socket() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
std::cerr << "socket 创建失败: " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Socket fd 创建成功: " << fd << std::endl;
// 设置端口复用
int opt = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
std::cerr << "setsockopt(SO_REUSEADDR) 失败: " << strerror(errno) << std::endl;
}
return fd;
}

static void Bind(int fd, int port) {
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
std::cerr << "bind 错误,端口 " << port << ": " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Bind 成功,fd: " << fd << " 到端口 " << port << std::endl;
}

static void Listen(int fd, int backlog = 128) { // 增加 backlog
if (listen(fd, backlog) < 0) {
std::cerr << "listen 错误: " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "Listen 成功,fd: " << fd << std::endl;
}

// --- Epoll 特定辅助函数 ---
void add_fd_to_epoll(int fd, uint32_t events) {
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
if (epoll_ctl(_epollFd, EPOLL_CTL_ADD, fd, &ev) == -1) {
std::cerr << "epoll_ctl 添加 fd " << fd << " 失败: " << strerror(errno) << std::endl;
// 根据情况处理,例如关闭fd
if (fd != _listenFd && fd != STDIN_FILENO) { // 不要关闭监听fd和标准输入
close(fd);
}
} else {
std::cout << "fd " << fd << " 已添加到 epoll 监视,事件: " << events << std::endl;
}
}

void remove_fd_from_epoll(int fd) {
if (epoll_ctl(_epollFd, EPOLL_CTL_DEL, fd, nullptr) == -1) {
std::cerr << "epoll_ctl 删除 fd " << fd << " 失败: " << strerror(errno) << std::endl;
} else {
std::cout << "fd " << fd << " 已从 epoll 监视中移除。" << std::endl;
}
close(fd); // 确保关闭文件描述符
}

void handle_new_connection() {
while (true) { // 对于非阻塞的server_fd,需要循环accept
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(_listenFd, (struct sockaddr *)&client_addr, &client_len);

if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 所有等待的连接都已处理完毕
break;
} else {
std::cerr << "accept 错误: " << strerror(errno) << std::endl;
break; // 其他错误
}
}

std::cout << "接受新连接,来自 " << inet_ntoa(client_addr.sin_addr)
<< ":" << ntohs(client_addr.sin_port) << ",新 fd: " << client_fd << std::endl;

if (set_nonblocking(client_fd) == -1) {
close(client_fd);
continue;
}
// 添加新的客户端连接到 epoll,使用边缘触发(ET)和一次性触发(EPOLLONESHOT)
// EPOLLONESHOT 使得一个fd上的事件只被触发一次,之后需要通过epoll_ctl EPOLL_CTL_MOD重新注册
// 这对于多线程处理单个fd非常有用,防止竞争。对于单线程循环读取,可以不加。
// 这里为了演示ET,加上EPOLLIN | EPOLLET
add_fd_to_epoll(client_fd, EPOLLIN | EPOLLET);
}
}

void handle_client_data(int client_fd) {
char buffer[BUFFER_SIZE];
std::vector<char> total_data; // 使用 vector 动态存储数据

std::cout << "处理来自 fd " << client_fd << " 的数据..." << std::endl;

// ET 模式下,需要循环读取直到 EAGAIN/EWOULDBLOCK
while (true) {
ssize_t bytes_read = read(client_fd, buffer, BUFFER_SIZE -1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0'; // 仅用于调试打印
std::cout << " 读取到 " << bytes_read << " 字节: '" << buffer << "'" << std::endl;
total_data.insert(total_data.end(), buffer, buffer + bytes_read);
} else if (bytes_read == 0) {
// 客户端关闭连接 (EOF)
std::cout << "客户端 fd " << client_fd << " 关闭了连接。" << std::endl;
remove_fd_from_epoll(client_fd); // 从epoll移除并关闭
return; // 处理完毕
} else { // bytes_read == -1
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 非阻塞模式下,数据已读完
std::cout << " fd " << client_fd << " 数据已读完 (EAGAIN/EWOULDBLOCK)。" << std::endl;
break;
} else {
// 发生其他读取错误
std::cerr << "read 从 fd " << client_fd << " 错误: " << strerror(errno) << std::endl;
remove_fd_from_epoll(client_fd); // 从epoll移除并关闭
return; // 处理完毕
}
}
}

if (!total_data.empty()) {
// 回显数据
ssize_t bytes_written_total = 0;
while(bytes_written_total < total_data.size()){
ssize_t bytes_written = write(client_fd, total_data.data() + bytes_written_total, total_data.size() - bytes_written_total);
if(bytes_written > 0){
bytes_written_total += bytes_written;
} else if (bytes_written == -1){
if(errno == EAGAIN || errno == EWOULDBLOCK){
// 发送缓冲区满,可以考虑用 EPOLLOUT 来处理这种情况
std::cout << " fd " << client_fd << " 发送缓冲区满,稍后重试或使用 EPOLLOUT。" << std::endl;
// 对于ET模式,如果想确保数据完全写完,需要将此fd注册EPOLLOUT事件
// 这里简单处理,丢弃未写完的数据或等待下次触发
break;
} else {
std::cerr << "write 到 fd " << client_fd << " 错误: " << strerror(errno) << std::endl;
remove_fd_from_epoll(client_fd);
return;
}
} else { // bytes_written == 0, 理论上不应该发生除非size为0
break;
}
}
std::cout << "已回显 " << bytes_written_total << " 字节到 fd " << client_fd << std::endl;
}
// 如果使用了 EPOLLONESHOT,处理完后需要重新 arm (MOD)
// add_fd_to_epoll(client_fd, EPOLLIN | EPOLLET | EPOLLONESHOT); // 假设 add_fd_to_epoll 内部处理 MOD
// 或者直接:
// struct epoll_event ev;
// ev.events = EPOLLIN | EPOLLET; // 根据是否使用EPOLLONESHOT决定
// ev.data.fd = client_fd;
// if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, client_fd, &ev) == -1) {
// perror("epoll_ctl: mod client_fd");
// remove_fd_from_epoll(client_fd);
// }
}

void handle_stdin_data() {
char buffer[BUFFER_SIZE];
// 标准输入通常是阻塞的,除非也设为非阻塞
// 这里假设是阻塞读取一次
ssize_t bytes_read = read(STDIN_FILENO, buffer, BUFFER_SIZE - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
std::cout << "从标准输入收到: " << buffer; // printf会自动加换行,cout不会
// 如果标准输入的内容是"quit",则退出服务器
if (strncmp(buffer, "quit", 4) == 0) {
std::cout << "收到 quit 命令,服务器将关闭..." << std::endl;
// 触发一个标志让主循环退出,或者直接exit
// 这里简单起见,直接影响主循环(不推荐在真实应用中这样做)
throw std::runtime_error("Shutdown command received");
}
} else if (bytes_read == 0) {
std::cout << "标准输入 EOF,将不再监听标准输入。" << std::endl;
epoll_ctl(_epollFd, EPOLL_CTL_DEL, STDIN_FILENO, nullptr); // 不再监听标准输入
} else {
std::cerr << "read 从标准输入错误: " << strerror(errno) << std::endl;
}
}


public:
EpollServer(int port) : _listenFd(-1), _epollFd(-1) {
_listenFd = Socket();
if (set_nonblocking(_listenFd) == -1) { // 监听套接字也设为非阻塞,配合循环accept
close(_listenFd);
exit(EXIT_FAILURE);
}
Bind(_listenFd, port);
Listen(_listenFd);

_epollFd = epoll_create1(0); // flags为0表示和epoll_create(size)行为类似,size被忽略
if (_epollFd == -1) {
std::cerr << "epoll_create1 失败: " << strerror(errno) << std::endl;
close(_listenFd);
exit(EXIT_FAILURE);
}

// 添加监听套接字到 epoll,通常使用水平触发(LT)处理新连接更简单
// 如果用ET,则accept需要循环处理
add_fd_to_epoll(_listenFd, EPOLLIN); // LT模式

// 添加标准输入到 epoll
// if (set_nonblocking(STDIN_FILENO) == -1) { // 如果想非阻塞读标准输入
// std::cerr << "无法设置标准输入为非阻塞" << std::endl;
// }
add_fd_to_epoll(STDIN_FILENO, EPOLLIN); // 假设标准输入是LT模式

std::cout << "EpollServer 初始化完成。正在监听 fd " << _listenFd << " 和标准输入。" << std::endl;
}

~EpollServer() {
if (_listenFd != -1) {
std::cout << "关闭监听 fd: " << _listenFd << std::endl;
close(_listenFd);
}
if (_epollFd != -1) {
std::cout << "关闭 epoll fd: " << _epollFd << std::endl;
close(_epollFd);
}
// 注意:在epoll实例关闭前,所有注册的fd理论上应该已经被epoll_ctl_del移除了
// 或者在迭代处理时关闭的。这里只是确保主要的fd被关闭。
std::cout << "EpollServer 关闭。" << std::endl;
}

void Start() {
struct epoll_event events[MAX_EVENTS];
std::cout << "服务器启动 epoll 循环..." << std::endl;
bool running = true;

while (running) {
int nready = epoll_wait(_epollFd, events, MAX_EVENTS, -1); // -1 表示无限等待

if (nready < 0) {
if (errno == EINTR) continue; // 被信号中断,重试
std::cerr << "epoll_wait 错误: " << strerror(errno) << std::endl;
break; // 严重错误
}

for (int i = 0; i < nready; ++i) {
int current_fd = events[i].data.fd;
uint32_t current_events = events[i].events;

if (current_events & (EPOLLERR | EPOLLHUP)) {
// 发生错误或挂断
std::cerr << "fd " << current_fd << " 发生错误或挂断事件。" << std::endl;
if (current_fd != _listenFd && current_fd != STDIN_FILENO) {
remove_fd_from_epoll(current_fd);
}
// 对于监听套接字或标准输入的错误,可能需要更特殊的处理
continue;
}


if (current_fd == _listenFd) {
// 有新的连接请求
if (current_events & EPOLLIN) {
handle_new_connection();
}
} else if (current_fd == STDIN_FILENO) {
// 标准输入有数据
if (current_events & EPOLLIN) {
try {
handle_stdin_data();
} catch (const std::runtime_error& e) {
std::cerr << "处理标准输入时发生异常: " << e.what() << std::endl;
running = false; // 停止服务器
break; // 退出for循环
}
}
} else {
// 是已连接的客户端套接字
if (current_events & EPOLLIN) {
// 有数据可读
handle_client_data(current_fd);
}
// 还可以处理EPOLLOUT事件,如果之前发送时遇到EAGAIN
// if (current_events & EPOLLOUT) {
// handle_client_write(current_fd);
// }
}
}
if(!running) break; // 如果收到退出命令
}
}
};

int main(int argc, char *argv[]) {
int port = DEFAULT_PORT;
if (argc > 1) {
port = atoi(argv[1]);
if (port <= 0 || port > 65535) {
std::cerr << "无效的端口号: " << argv[1] << std::endl;
port = DEFAULT_PORT;
}
}

try {
EpollServer server(port);
server.Start();
} catch (const std::exception& e) {
std::cerr << "服务器运行时发生未捕获的异常: " << e.what() << std::endl;
return EXIT_FAILURE;
}


return EXIT_SUCCESS;
}

三种多路复用技术的对比

特性 select poll epoll
FD 存储 fd_set 位图 pollfd 结构体数组 内核维护 (红黑树管理所有FD,链表管理就绪FD)
最大连接数 FD_SETSIZE (通常 1024) 无硬性限制 (受限于内存) 无硬性限制 (受限于内存, 远大于前两者)
内核/用户拷贝 每次调用都拷贝 fd_set 每次调用都拷贝 pollfd 数组 epoll_ctl 时拷贝一次, epoll_wait 可能使用共享内存 (mmap)
效率/扫描方式 每次轮询所有 FD (O(N)) 每次轮询所有 FD (O(N)) 只返回就绪的 FD, 事件驱动 (O(K),K为就绪FD数)
返回就绪FD 修改传入的 fd_set 不修改, 通过 revents 成员返回 返回就绪 FD 列表
触发模式 水平触发 (LT) 水平触发 (LT) 水平触发 (LT)边缘触发 (ET)
线程安全 (指对监控集合的修改) 不安全 (若多线程共享并修改同一个fd_set,需要外部同步) 相对安全 (每个线程可以有自己的pollfd数组,或对共享数组同步) epoll_ctl 操作是原子性的epfd 本身是线程安全的
可移植性 好 (POSIX) 好 (POSIX) 差 (Linux 特有)
  • 标题: Linux高级IO模型
  • 作者: The Redefine Team
  • 创建于 : 2024-12-05 13:26:00
  • 更新于 : 2025-06-03 14:35:19
  • 链接: https://redefine.ohevan.com/2024/12/05/Linux高级IO模型/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论