第2章:网络IO多路复用——从单线程到高并发
第1章的非阻塞代码能同时处理100个连接吗?能。但CPU会累死在"没有意义的轮询"里。
本章要解决的核心问题是:怎么只处理"有事做"的连接,不理"没事做"的?
这是网络层最重要的一章。彭放在这里花了大量篇幅讲Select和Epoll,不是因为他喜欢API,而是因为一个游戏服务器的性能上限,80%由网络层决定。 业务逻辑再烂,可以慢慢优化;网络层选错模型,架构就死了。
一、为什么需要IO多路复用?回顾第1章的困境
第1章末尾的非阻塞服务端,核心循环是这样的:
while (true) {
// 检查10000个连接,9999个没有数据
for (int fd : all_client_fds) {
recv(fd, ...); // 9999次返回 EAGAIN
}
}CPU在做一件极其低效的事:询问每个socket"你有数据吗?"。绝大多数答案都是"没有",但CPU不得不问。
想象你在一个万人演唱会现场当主持人,你问每一个人"你想唱歌吗?"——绝大多数人说"不"。高效的做法是:谁想唱谁举手,你只点举手的人。
IO多路复用就是这个"举手"机制。内核帮你监控所有socket,只返回"状态变化"的那一批。
flowchart LR
subgraph 轮询["❌ 轮询模式"]
A1[你] --"逐个问10000人"--> B1[10000个socket]
B1 --"9999个说没有"--> A1
end
subgraph 复用["✅ IO多路复用"]
A2[你] --"告诉内核:帮我盯着这10000个"--> K[内核]
K --"只有10个有数据,给你名单"--> A2
A2 --"只处理这10个"--> B2[10个活跃的socket]
endLinux历史上出现过三种IO多路复用机制:
| 机制 | 出现年代 | 最大文件描述符 | 核心思想 | 现状 |
|---|---|---|---|---|
select | 1983 | 1024(默认) | 位图扫描 | 教学/兼容用 |
poll | 1997 | 无上限 | 链表遍历 | select的升级版,但本质一样 |
epoll | 2002 | 无上限 | 事件回调 | 生产环境唯一选择 |
本书重点讲Select和Epoll。Select是"理解原理"的必经之路——如果你懂了Select为什么慢,就懂了Epoll为什么快。
二、Select网络模型:最古老的"举手"机制
Select的核心API
#include <sys/select.h>
int select(int nfds, // 最大fd+1
fd_set *readfds, // 监控"可读"的fd集合
fd_set *writefds, // 监控"可写"的fd集合
fd_set *exceptfds, // 监控"异常"的fd集合
struct timeval *timeout // 超时:NULL=永久阻塞,0=立即返回,其他=等待时长
);fd_set:用位图表示"关注哪些fd"
fd_set readfds;
FD_ZERO(&readfds); // 清空集合
FD_SET(listen_fd, &readfds); // 把监听socket加进去
FD_SET(client_fd, &readfds); // 把客户端socket也加进去
// 检查某个fd是否在集合中
if (FD_ISSET(client_fd, &readfds)) {
// client_fd 有数据可读!
}
// 从集合移除
FD_CLR(client_fd, &readfds);fd_set本质上是一个位图(bitmap):每个bit对应一个文件描述符。FD_SET(5)就是把第5个bit置1。
这是Select的第一个性能瓶颈:位图大小有限(通常是1024个bit),所以Select默认最多监控1024个fd。你要监控fd=10000?对不起,位图不够大。
Select服务端完整代码
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <cstring>
#include <iostream>
#include <vector>
int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int main() {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
set_nonblocking(listen_fd);
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8888);
server_addr.sin_addr.s_addr = INADDR_ANY;
bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
listen(listen_fd, 128);
std::cout << "Select Server on 0.0.0.0:8888\n";
fd_set master_readfds; // 所有需要监控"可读"的fd
FD_ZERO(&master_readfds);
FD_SET(listen_fd, &master_readfds);
int max_fd = listen_fd;
while (true) {
fd_set readfds = master_readfds; // 每次select都要复制!
// select阻塞等待,直到有fd就绪
int ready_count = select(max_fd + 1, &readfds, nullptr, nullptr, nullptr);
if (ready_count < 0) {
if (errno == EINTR) continue; // 被信号中断,重来
perror("select failed");
break;
}
// 检查监听socket:有新连接?
if (FD_ISSET(listen_fd, &readfds)) {
struct sockaddr_in client_addr{};
socklen_t len = sizeof(client_addr);
int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &len);
if (client_fd >= 0) {
set_nonblocking(client_fd);
FD_SET(client_fd, &master_readfds); // 加入监控集合
if (client_fd > max_fd) max_fd = client_fd;
std::cout << "New client: " << client_fd << "\n";
}
}
// 检查所有客户端socket:有数据?
// 注意:这里必须从3开始遍历到max_fd,因为fd_set没有"存了哪些fd"的信息
for (int fd = 0; fd <= max_fd; ++fd) {
if (fd == listen_fd) continue; // 上面已经处理过了
if (FD_ISSET(fd, &readfds)) {
char buffer[1024];
int recv_len = recv(fd, buffer, sizeof(buffer) - 1, 0);
if (recv_len > 0) {
buffer[recv_len] = '\0';
std::cout << "From " << fd << ": " << buffer << "\n";
send(fd, buffer, recv_len, 0); // echo
} else if (recv_len == 0 || (recv_len < 0 && errno != EAGAIN)) {
// 断开或错误
std::cout << "Client " << fd << " disconnected\n";
close(fd);
FD_CLR(fd, &master_readfds); // 从监控集合移除
// 实际代码需要更新max_fd
}
}
}
}
return 0;
}Select的三重性能瓶颈
flowchart TD
A["Select的性能瓶颈"] --> B["瓶颈1: 位图大小"]
A --> C["瓶颈2: 线性扫描"]
A --> D["瓶颈3: 重复拷贝"]
B --> B1["fd_set默认1024bit\n改FD_SETSIZE编译参数能扩\n但本质不是解决方案"]
C --> C1["每次select返回后\n必须遍历0~max_fd检查FD_ISSET\nO(n)复杂度,n=max_fd"]
D --> D1["每次调用select\n都要把fd_set从用户空间\n拷贝到内核空间\n大量上下文切换"]瓶颈1:位图大小
// 在/usr/include/sys/select.h里,FD_SETSIZE默认1024
#define FD_SETSIZE 1024
// 你可以改,但改了要重新编译所有相关代码,且不解决根本问题瓶颈2:线性扫描
// 假设你只监控了3个fd:3, 5, 1000
// 但select返回后,你还是要从0遍历到1000
for (int fd = 0; fd <= max_fd; ++fd) { // 循环1001次,只命中3次
if (FD_ISSET(fd, &readfds)) { ... }
}瓶颈3:重复拷贝
fd_set readfds = master_readfds; // 每次循环都复制整个位图!
select(max_fd + 1, &readfds, ...); // 再拷贝到内核Select的适用场景
- 跨平台兼容:Windows支持Select,不支持Epoll
- 连接数<100:这时候瓶颈不明显,代码简单
- 教学理解:弄懂Select,才能理解Epoll为什么设计成这样
三、Epoll网络模型:生产环境的唯一选择
Epoll是Linux 2.6引入的(2002年),彻底解决了Select的三个瓶颈。它不是"Select的升级版",而是完全不同的设计哲学。
核心差异:从"轮询"到"事件回调"
| Select | Epoll | |
|---|---|---|
| 告诉内核什么 | "帮我盯着这1024个fd,每次告诉你一遍" | "这个fd给我盯着,有事通知我"(只注册一次) |
| 内核怎么通知 | 返回整个位图,你自己遍历找 | 只返回"有事的fd列表" |
| 复杂度 | O(max_fd) | O(就绪数量) |
| 最大fd | 1024(默认) | 系统内存上限 |
| 数据拷贝 | 每次调用都拷贝 | 注册时拷贝一次,用共享内存 |
Epoll的三个核心API
#include <sys/epoll.h>
// 1. 创建epoll实例
int epoll_fd = epoll_create1(0); // 参数传0即可,旧版用epoll_create(1024)但现在参数被忽略
// 2. 注册/修改/删除监控的fd
struct epoll_event ev;
ev.events = EPOLLIN; // 监控"可读"事件
// EPOLLOUT=可写, EPOLLERR=错误, EPOLLHUP=断开
// EPOLLET=边缘触发(后面讲)
ev.data.fd = client_fd; // 用户数据,epoll返回时原样带回
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev); // 添加
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client_fd, &ev); // 修改
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr); // 删除
// 3. 等待事件
struct epoll_event events[1024];
int ready_count = epoll_wait(epoll_fd, events, 1024, -1); // -1=永久阻塞,0=立即返回,>0=毫秒超时
// 返回后,events[0..ready_count-1]就是所有就绪的fd
for (int i = 0; i < ready_count; ++i) {
int fd = events[i].data.fd;
uint32_t ev_flags = events[i].events;
// 处理fd的事件...
}Epoll服务端完整代码
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <cstring>
#include <iostream>
int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int main() {
// ========== 创建监听socket ==========
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
set_nonblocking(listen_fd);
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8888);
server_addr.sin_addr.s_addr = INADDR_ANY;
bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
listen(listen_fd, 128);
std::cout << "Epoll Server on 0.0.0.0:8888\n";
// ========== 创建epoll实例 ==========
int epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1 failed");
return -1;
}
// 把监听socket加入epoll
struct epoll_event ev{};
ev.events = EPOLLIN;
ev.data.fd = listen_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);
// ========== 事件循环 ==========
struct epoll_event events[1024];
const int MAX_EVENTS = 1024;
while (true) {
// 等待事件,-1表示永久阻塞直到有事件
int ready_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (ready_count < 0) {
if (errno == EINTR) continue;
perror("epoll_wait failed");
break;
}
// 只遍历"有事的"fd,最多1024个
for (int i = 0; i < ready_count; ++i) {
int fd = events[i].data.fd;
uint32_t ev = events[i].events;
// 错误事件
if (ev & (EPOLLERR | EPOLLHUP)) {
std::cout << "fd " << fd << " error/hup, closing\n";
close(fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
continue;
}
// ========== 监听socket:新连接 ==========
if (fd == listen_fd) {
while (true) { // 边缘触发模式下要accept到EAGAIN为止
struct sockaddr_in client_addr{};
socklen_t len = sizeof(client_addr);
int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &len);
if (client_fd < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 没有更多连接了
}
perror("accept failed");
break;
}
set_nonblocking(client_fd);
struct epoll_event client_ev{};
client_ev.events = EPOLLIN | EPOLLET; // 边缘触发!
client_ev.data.fd = client_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &client_ev);
std::cout << "New client: " << client_fd << "\n";
}
}
// ========== 客户端socket:有数据 ==========
else if (ev & EPOLLIN) {
char buffer[1024];
int recv_len = recv(fd, buffer, sizeof(buffer) - 1, 0);
if (recv_len > 0) {
buffer[recv_len] = '\0';
std::cout << "From " << fd << ": " << buffer << "\n";
send(fd, buffer, recv_len, 0); // echo
} else if (recv_len == 0) {
// 客户端主动关闭
std::cout << "Client " << fd << " disconnected\n";
close(fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
} else if (errno != EAGAIN && errno != EWOULDBLOCK) {
// 真正的错误
perror("recv error");
close(fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
}
}
}
}
close(epoll_fd);
close(listen_fd);
return 0;
}水平触发(LT)vs 边缘触发(ET)
这是Epoll最容易踩的坑。
flowchart LR
subgraph LT["水平触发 Level Trigger"]
A1[fd有数据] --"epoll_wait返回"--> B1[你只读了部分]
B1 --"再调epoll_wait"--> C1[又返回!直到数据读完"]
style C1 fill:#ffcccc
end
subgraph ET["边缘触发 Edge Trigger"]
A2[fd有数据] --"epoll_wait返回一次"--> B2[你必须读完]
B2 --"再调epoll_wait"--> C2[不返回!直到新数据来"]
style B2 fill:#ccffcc
end| 水平触发 (LT) | 边缘触发 (ET) | |
|---|---|---|
| 触发条件 | fd有未读数据 → 一直报告 | 数据"从无到有"或"从少到多" → 只报告一次 |
| 代码要求 | 简单,读不读完无所谓 | 必须把数据读完,否则丢失事件 |
| 性能 | 可能重复触发 | 触发次数最少 |
| 配合模式 | 阻塞或非阻塞都可以 | 必须非阻塞(否则可能在read中阻塞) |
ET的陷阱:如果你recv()一次没读完缓冲区所有数据,epoll不会再次通知你——它觉得"我已经通知过了,你没读完是你的事"。
解决方案:ET模式下,收到EPOLLIN后,循环recv()直到返回EAGAIN。
// ET模式下的正确读法
if (ev & EPOLLIN) {
while (true) {
int recv_len = recv(fd, buffer, sizeof(buffer), 0);
if (recv_len > 0) {
// 处理数据
} else if (recv_len < 0 && errno == EAGAIN) {
break; // 读完了,真正的缓冲区空
} else {
// 断开或错误
close(fd);
break;
}
}
}生产环境一般用ET + 非阻塞,因为触发次数最少,性能最高。但代价是代码更复杂,一不注意就丢事件。
Epoll的数据结构内幕(简化版)
Epoll为什么快?因为它在内核里用了红黑树 + 就绪链表:
用户空间 内核空间
│ │
│ epoll_ctl(ADD, fd) │
│───────────────────────▶│
│ │ ┌─────────────────┐
│ │ │ 红黑树 (RBTree) │
│ │ │ 存储所有监控的fd │
│ │ │ key=fd, value=epoll_event │
│ │ └─────────────────┘
│ │ │
│ epoll_wait() │ ▼
│──────────────────────▶│ ┌─────────────────┐
│ │ │ 就绪链表 │
│◄──────────────────────│ │ 网卡中断时, │
│ 返回events[] │ │ 有数据的fd插到这里│
│ (只含就绪的) │ └─────────────────┘- 红黑树:存储所有被epoll监控的fd,增删查都是O(log n)。注册一次,长期有效。
- 就绪链表:网卡收到数据 → 内核中断处理 → 把对应fd插到就绪链表。
epoll_wait()只从链表取数据,不需要遍历所有fd。
这就是"O(就绪数量)"复杂度的来源。
四、Buffer设计:网络层的核心数据结构
TCP是字节流,没有消息边界。recv()可能给你半条消息、一条消息、或者一条半消息。Buffer的作用就是"攒数据,凑够一条完整消息再上交"。
一个最简单的Buffer
#include <vector>
#include <cstring>
#include <algorithm>
class Buffer {
public:
static const size_t INITIAL_SIZE = 1024;
Buffer() : buffer_(INITIAL_SIZE), read_index_(0), write_index_(0) {}
// 可写字节数
size_t writable_bytes() const {
return buffer_.size() - write_index_;
}
// 可读字节数
size_t readable_bytes() const {
return write_index_ - read_index_;
}
// 写入数据(从recv来的)
void append(const char* data, size_t len) {
ensure_writable(len);
memcpy(begin_write(), data, len);
write_index_ += len;
}
// 读取数据(交给上层解析)
const char* peek() const {
return begin() + read_index_;
}
void retrieve(size_t len) {
if (len < readable_bytes()) {
read_index_ += len;
} else {
retrieve_all();
}
}
void retrieve_all() {
read_index_ = 0;
write_index_ = 0;
}
// 获取可写位置的指针(给recv用)
char* begin_write() {
return begin() + write_index_;
}
private:
char* begin() { return buffer_.data(); }
const char* begin() const { return buffer_.data(); }
void ensure_writable(size_t len) {
if (writable_bytes() < len) {
// 如果有废弃空间,先整理
if (read_index_ > 0) {
size_t readable = readable_bytes();
memmove(begin(), peek(), readable);
read_index_ = 0;
write_index_ = readable;
}
// 还不够就扩容
if (writable_bytes() < len) {
buffer_.resize(write_index_ + len);
}
}
}
std::vector<char> buffer_;
size_t read_index_; // 读指针
size_t write_index_; // 写指针
};Buffer使用流程
sequenceDiagram
participant Net as 网络层
participant Buf as Buffer
participant Parser as Packet解析器
participant Logic as 业务逻辑
Net->>Buf: recv() 得到 "[HALF_MSG_1]"
Buf->>Buf: append(data, len)
Note over Buf: 可读: 半条消息
Net->>Buf: recv() 得到 "[HALF_MSG_1_END][MSG_2]"
Buf->>Buf: append(data, len)
Note over Buf: 可读: 一条半消息
Parser->>Buf: peek() 检查长度字段
Parser->>Buf: retrieve(msg1_len) 取走第一条
Parser->>Logic: 解析后的消息对象
Parser->>Buf: 检查剩余数据
Note over Buf: 还有MSG_2的一半
Net->>Buf: recv() 得到 "[MSG_2_REST]"
Buf->>Buf: append
Parser->>Buf: 现在MSG_2完整了,retrieve取走生产级Buffer的考虑
- 内存池:频繁new/delete会碎片化。游戏服一般用定长内存池或ring buffer。
- 上限保护:恶意客户端不发完整消息头,Buffer无限增长。要设上限,超了就踢。
- 零拷贝:Linux有
splice()和sendfile(),数据不经过用户空间。但游戏协议通常需要解析,零拷贝场景少。 - 双Buffer:读Buffer + 写Buffer分离,避免读写竞争。
五、Packet设计:给字节流加上"边界"
Buffer解决"攒数据",Packet解决"怎么知道一条消息多长"。
最常见的协议头设计:长度 + 类型 + payload
┌─────────┬─────────┬─────────────────────┐
│ 长度(2B) │ 类型(2B) │ Payload(n B) │
│ uint16 │ uint16 │ protobuf/json/... │
└─────────┴─────────┴─────────────────────┘
↑ ↑
总长度 = 4 + n 实际业务数据#pragma pack(push, 1) // 按1字节对齐,不要填充
struct PacketHeader {
uint16_t len; // 总长度(头+payload)
uint16_t msg_type; // 消息类型,比如1=登录,2=移动,3=攻击...
};
#pragma pack(pop)
const uint16_t HEADER_SIZE = sizeof(PacketHeader);Packet解析流程
bool parse_packet(Buffer& buf, uint16_t& out_type, std::string& out_payload) {
// 1. 检查是否有完整的头
if (buf.readable_bytes() < HEADER_SIZE) {
return false; // 数据不够,等下一次recv
}
// 2. 读取头(注意字节序!网络序要转主机序)
const PacketHeader* header = reinterpret_cast<const PacketHeader*>(buf.peek());
uint16_t total_len = ntohs(header->len);
uint16_t msg_type = ntohs(header->msg_type);
// 3. 检查是否有完整的payload
if (buf.readable_bytes() < total_len) {
return false; // payload还没全到
}
// 4. 提取payload
out_type = msg_type;
out_payload.assign(buf.peek() + HEADER_SIZE, total_len - HEADER_SIZE);
// 5. 从Buffer移除这条消息
buf.retrieve(total_len);
return true;
}常见协议头变体
| 类型 | 格式 | 适用场景 |
|---|---|---|
| 固定长度 | 4字节头(2B长度+2B类型) | 简单,最常用 |
| 分隔符 | \r\n或\0分隔 | 文本协议,如HTTP、Redis |
| 自描述长度 | 变长编码(如protobuf的varint) | 节省带宽 |
| 魔数+版本 | 0xABCD魔数 + 版本号 + 长度 | 防协议混淆,支持升级 |
六、protobuf协议定义与序列化
JSON对人类友好,对机器不友好——解析慢、体积大、类型弱。游戏服用protobuf是行业标配。
定义协议文件
// game.proto
syntax = "proto3";
package GameProto;
// 登录请求
message LoginReq {
string account = 1; // 账号
string password = 2; // 密码(实际应该是token或签名)
int32 platform = 3; // 平台:1=iOS, 2=Android, 3=PC
}
// 登录响应
message LoginResp {
int32 result = 1; // 0=成功,其他=错误码
int64 player_id = 2; // 玩家唯一ID
string token = 3; // 会话token
}
// 移动请求
message MoveReq {
int32 scene_id = 1; // 场景ID
float x = 2; // 目标X坐标
float y = 3; // 目标Y坐标
float z = 4; // 目标Z坐标
}
// 移动广播(服务端推给周围玩家)
message MoveNotify {
int64 player_id = 1;
float x = 2;
float y = 3;
float z = 4;
int64 timestamp = 5; // 服务器时间戳,用于校验和排序
}C++使用protobuf
#include "game.pb.h" // protoc生成的头文件
// ===== 序列化:对象 → 字节流 =====
GameProto::LoginReq req;
req.set_account("player_123");
req.set_password("hashed_token_here");
req.set_platform(2);
// 序列化到string
std::string serialized;
req.SerializeToString(&serialized);
// 封包:长度 + 类型 + protobuf payload
uint16_t msg_type = 1001; // 假设1001是LoginReq
uint16_t total_len = HEADER_SIZE + serialized.size();
char packet[4096];
PacketHeader header;
header.len = htons(total_len);
header.msg_type = htons(msg_type);
memcpy(packet, &header, HEADER_SIZE);
memcpy(packet + HEADER_SIZE, serialized.data(), serialized.size());
send(client_fd, packet, total_len, 0);
// ===== 反序列化:字节流 → 对象 =====
// 假设buf已经通过Buffer和Packet解析拿到了payload
std::string payload = ...; // 从Buffer提取的protobuf二进制数据
GameProto::LoginReq received_req;
if (received_req.ParseFromString(payload)) {
std::cout << "Account: " << received_req.account() << "\n";
std::cout << "Platform: " << received_req.platform() << "\n";
}protobuf的优势
| 优势 | 说明 |
|---|---|
| 紧凑 | 变长编码,数字越小占字节越少 |
| 快速 | 二进制解析,比JSON快10-100倍 |
| 类型安全 | 编译期检查字段类型 |
| 向后兼容 | 新增字段不破坏旧代码 |
| 多语言 | 自动生成C++/Java/Python/Go… |
工程建议
- 版本管理:proto文件是接口契约,改动要慎重。建议用Git管理,重大变更发版本号。
- 字段编号不要复用:protobuf靠字段编号识别字段,删掉一个字段后别复用它的编号——旧代码会误解。
- optional和repeated:proto3默认所有字段都是optional(不用显式标),但repeated字段要显式声明。
- 不要直接传密码:示例里的
password字段只是为了演示。真实游戏里,客户端应该传登录token(比如SDK校验后的session),服务端永远不要信任客户端发的密码。
七、全书libserver框架的Network基类初探
彭放自研的libserver框架封装了Select和Epoll的差异。本章末尾,他把前面的所有概念整合成一个可扩展的架构:
libserver网络层架构(简化)
Network.h/cpp # 纯虚基类,定义接口
├── NetworkSelect.h # Select实现
└── NetworkEpoll.h # Epoll实现
NetworkListen.h # 监听端口
NetworkConnector.h # 主动连接(连别的服)
Buffer.h # 收发缓冲区
Packet.h # 封包/解包
MessageQueue.h # 网络层和业务层的消息队列关键设计:网络线程和业务线程分离
flowchart LR
subgraph NetThread["网络线程"]
A1[epoll_wait] --"收数据"--> B1[Buffer]
B1 --"凑够一条消息"--> C1[Packet解析]
C1 --"放入队列"--> D1[MsgQueue]
end
subgraph BizThread["业务线程"]
D1 --"取出消息"--> E1[业务逻辑处理]
E1 --"生成回复"--> F1[MsgQueue]
end
subgraph NetThread2["网络线程"]
F1 --"取出回复"--> G1[Packet序列化]
G1 --"写入Buffer"--> H1[send]
end为什么要分离?
因为epoll_wait()是阻塞的(或带超时的),业务逻辑可能也是阻塞的(比如查数据库)。如果混在一起,一个玩家"发了条消息触发数据库查询",会导致整个epoll循环卡住,其他玩家都收不到消息。
分离后的原则:
- 网络线程只管"收发数据"和"协议解析",不做业务
- 业务线程从队列取消息,处理后塞回复到另一个队列
- 队列是唯一的共享资源,用无锁队列或单生产者单消费者避免锁竞争
八、工程建议与常见坑点
✅ 做对的事
监听socket也用非阻塞+ET。高并发时,连接蜂拥而至,LT模式下可能触发多次。ET模式下一次
accept要循环到EAGAIN,确保全接完。设置SO_KEEPALIVE。TCP自带保活机制,但默认间隔太长(2小时)。游戏服应该在应用层做心跳,更快发现问题。
处理EINTR。
epoll_wait()可能被信号中断,返回-1且errno==EINTR。要重试,不能当错误退出。控制epoll_wait的maxevents。1024够用吗?如果你预期单线程处理1万并发,设成
maxevents = 10000。返回的数组要够大。单线程Reactor vs 多线程Reactor。单线程epoll适合CPU密集度不高的场景(如聊天服)。游戏战斗服逻辑重,可以用"一个线程epoll收数据 + 多个线程处理业务"(多Reactors)。
❌ 常见的坑
ET模式下没读完数据。这是Epoll ET的头号杀手bug。表现是:客户端发了消息,服务端不触发——因为上一次触发时你没读完,epoll认为"已经通知过了"。
在epoll事件处理里做阻塞操作。比如收到登录消息,立刻查MySQL验证密码。这条连接的处理被卡住了,但更糟糕的是——如果你是用单线程Reactor,所有连接都跟着卡。
忘记处理EPOLLOUT。非阻塞模式下,如果发送缓冲区满,
send()会返回EAGAIN。这时候要注册EPOLLOUT事件,等内核通知"可写了"再发。很多示例代码为了简单忽略了这点,但生产环境不能忽略。fd泄漏。
close(fd)忘了epoll_ctl(DEL),或者反过来。epoll会监控一个已关闭的fd吗?不会,但你的数据结构里可能还存着引用,导致逻辑混乱。大小端问题。x86是小端,网络协议是大端。所有多字节字段发出去之前都要
htons()/htonl(),收到之后都要ntohs()/ntohl()。漏一次,调试到死。
九、Select vs Epoll:终极对比
flowchart TD
subgraph SelectDetail["Select"]
S1["O(max_fd) 扫描"]
S2["fd_set拷贝开销"]
S3["默认1024限制"]
S4["跨平台"]
end
subgraph EpollDetail["Epoll"]
E1["O(就绪数) 事件驱动"]
E2["mmap共享内存"]
E3["无fd上限"]
E4["Linux only"]
end
SelectDetail --> S5["适用: <100连接\n教学/兼容Windows"]
EpollDetail --> E5["适用: >1000连接\n所有生产环境"]| 场景 | 推荐 |
|---|---|
| 学习理解IO多路复用原理 | Select |
| 写跨平台代码(Win+Linux) | Select(或libevent/libuv封装) |
| 生产环境Linux游戏服 | Epoll,没有例外 |
| 10万+并发 | Epoll + 多线程 + 多进程 |
十、本章核心回顾
- IO多路复用的目的:避免轮询所有socket,只处理"有事的"
- Select:位图+线性扫描,教学用,生产环境别用
- Epoll:红黑树+就绪链表,O(就绪数),生产唯一选择
- LT vs ET:ET性能高但容易丢事件,必须循环读到EAGAIN
- Buffer:解决TCP字节流没有消息边界的问题
- Packet:给消息加头(长度+类型),让Buffer知道"凑够多少算一条"
- protobuf:游戏协议的标配,紧凑、快速、类型安全
- 网络线程和业务线程必须分离:epoll循环不能被业务阻塞
十一、进阶作业
基于本章代码,实现一个功能完整的Echo服务器升级:
- 用Epoll ET模式,正确处理accept和recv的循环
- 实现Buffer + Packet层,协议头用
2B长度 + 2B类型 - 用protobuf定义消息:LoginReq/LoginResp/ChatMsg
- 支持多客户端:每个客户端分配player_id,聊天消息广播给所有人
- 添加心跳检测:30秒没收到心跳就踢掉
这个作业的代码量大约是本章示例的3-5倍,但完成后你就拥有了一个"可以写进简历"的雏形游戏聊天服务器。
Select到Epoll的演进,本质上是"从’我查你’到’你通知我’的权力转移。
这个转移在游戏服务器里至关重要——因为你要查的不是10个fd,是10万个。
当你开始用Epoll,才终于可以说:"这台服务器, ready for production."
"能处理100个连接的叫demo,能处理10万个连接的叫产品。" 🖤