多人在线游戏架构实战第2章:网络IO多路复用——从单线程到高并发

📑 目录

第2章:网络IO多路复用——从单线程到高并发

第1章的非阻塞代码能同时处理100个连接吗?能。但CPU会累死在"没有意义的轮询"里。
本章要解决的核心问题是:怎么只处理"有事做"的连接,不理"没事做"的?

这是网络层最重要的一章。彭放在这里花了大量篇幅讲Select和Epoll,不是因为他喜欢API,而是因为一个游戏服务器的性能上限,80%由网络层决定。 业务逻辑再烂,可以慢慢优化;网络层选错模型,架构就死了。


一、为什么需要IO多路复用?回顾第1章的困境

第1章末尾的非阻塞服务端,核心循环是这样的:

while (true) {
    // 检查10000个连接,9999个没有数据
    for (int fd : all_client_fds) {
        recv(fd, ...);  // 9999次返回 EAGAIN
    }
}

CPU在做一件极其低效的事:询问每个socket"你有数据吗?"。绝大多数答案都是"没有",但CPU不得不问。

想象你在一个万人演唱会现场当主持人,你问每一个人"你想唱歌吗?"——绝大多数人说"不"。高效的做法是:谁想唱谁举手,你只点举手的人。

IO多路复用就是这个"举手"机制。内核帮你监控所有socket,只返回"状态变化"的那一批。

flowchart LR
    subgraph 轮询["❌ 轮询模式"]
        A1[你] --"逐个问10000人"--> B1[10000个socket]
        B1 --"9999个说没有"--> A1
    end
    
    subgraph 复用["✅ IO多路复用"]
        A2[你] --"告诉内核:帮我盯着这10000个"--> K[内核]
        K --"只有10个有数据,给你名单"--> A2
        A2 --"只处理这10个"--> B2[10个活跃的socket]
    end

Linux历史上出现过三种IO多路复用机制:

机制出现年代最大文件描述符核心思想现状
select19831024(默认)位图扫描教学/兼容用
poll1997无上限链表遍历select的升级版,但本质一样
epoll2002无上限事件回调生产环境唯一选择

本书重点讲Select和Epoll。Select是"理解原理"的必经之路——如果你懂了Select为什么慢,就懂了Epoll为什么快。


二、Select网络模型:最古老的"举手"机制

Select的核心API

#include <sys/select.h>

int select(int nfds,           // 最大fd+1
           fd_set *readfds,    // 监控"可读"的fd集合
           fd_set *writefds,   // 监控"可写"的fd集合
           fd_set *exceptfds,  // 监控"异常"的fd集合
           struct timeval *timeout  // 超时:NULL=永久阻塞,0=立即返回,其他=等待时长
           );

fd_set:用位图表示"关注哪些fd"

fd_set readfds;
FD_ZERO(&readfds);      // 清空集合
FD_SET(listen_fd, &readfds);  // 把监听socket加进去
FD_SET(client_fd, &readfds);  // 把客户端socket也加进去

// 检查某个fd是否在集合中
if (FD_ISSET(client_fd, &readfds)) {
    // client_fd 有数据可读!
}

// 从集合移除
FD_CLR(client_fd, &readfds);

fd_set本质上是一个位图(bitmap):每个bit对应一个文件描述符。FD_SET(5)就是把第5个bit置1。

这是Select的第一个性能瓶颈:位图大小有限(通常是1024个bit),所以Select默认最多监控1024个fd。你要监控fd=10000?对不起,位图不够大。

Select服务端完整代码

#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <cstring>
#include <iostream>
#include <vector>

int set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    set_nonblocking(listen_fd);
    
    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    
    struct sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8888);
    server_addr.sin_addr.s_addr = INADDR_ANY;
    
    bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    listen(listen_fd, 128);
    
    std::cout << "Select Server on 0.0.0.0:8888\n";
    
    fd_set master_readfds;      // 所有需要监控"可读"的fd
    FD_ZERO(&master_readfds);
    FD_SET(listen_fd, &master_readfds);
    
    int max_fd = listen_fd;
    
    while (true) {
        fd_set readfds = master_readfds;  // 每次select都要复制!
        
        // select阻塞等待,直到有fd就绪
        int ready_count = select(max_fd + 1, &readfds, nullptr, nullptr, nullptr);
        
        if (ready_count < 0) {
            if (errno == EINTR) continue;  // 被信号中断,重来
            perror("select failed");
            break;
        }
        
        // 检查监听socket:有新连接?
        if (FD_ISSET(listen_fd, &readfds)) {
            struct sockaddr_in client_addr{};
            socklen_t len = sizeof(client_addr);
            int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &len);
            
            if (client_fd >= 0) {
                set_nonblocking(client_fd);
                FD_SET(client_fd, &master_readfds);  // 加入监控集合
                if (client_fd > max_fd) max_fd = client_fd;
                std::cout << "New client: " << client_fd << "\n";
            }
        }
        
        // 检查所有客户端socket:有数据?
        // 注意:这里必须从3开始遍历到max_fd,因为fd_set没有"存了哪些fd"的信息
        for (int fd = 0; fd <= max_fd; ++fd) {
            if (fd == listen_fd) continue;  // 上面已经处理过了
            
            if (FD_ISSET(fd, &readfds)) {
                char buffer[1024];
                int recv_len = recv(fd, buffer, sizeof(buffer) - 1, 0);
                
                if (recv_len > 0) {
                    buffer[recv_len] = '\0';
                    std::cout << "From " << fd << ": " << buffer << "\n";
                    send(fd, buffer, recv_len, 0);  // echo
                } else if (recv_len == 0 || (recv_len < 0 && errno != EAGAIN)) {
                    // 断开或错误
                    std::cout << "Client " << fd << " disconnected\n";
                    close(fd);
                    FD_CLR(fd, &master_readfds);  // 从监控集合移除
                    // 实际代码需要更新max_fd
                }
            }
        }
    }
    
    return 0;
}

Select的三重性能瓶颈

flowchart TD
    A["Select的性能瓶颈"] --> B["瓶颈1: 位图大小"]
    A --> C["瓶颈2: 线性扫描"]
    A --> D["瓶颈3: 重复拷贝"]
    
    B --> B1["fd_set默认1024bit\n改FD_SETSIZE编译参数能扩\n但本质不是解决方案"]
    C --> C1["每次select返回后\n必须遍历0~max_fd检查FD_ISSET\nO(n)复杂度,n=max_fd"]
    D --> D1["每次调用select\n都要把fd_set从用户空间\n拷贝到内核空间\n大量上下文切换"]

瓶颈1:位图大小

// 在/usr/include/sys/select.h里,FD_SETSIZE默认1024
#define FD_SETSIZE 1024
// 你可以改,但改了要重新编译所有相关代码,且不解决根本问题

瓶颈2:线性扫描

// 假设你只监控了3个fd:3, 5, 1000
// 但select返回后,你还是要从0遍历到1000
for (int fd = 0; fd <= max_fd; ++fd) {  // 循环1001次,只命中3次
    if (FD_ISSET(fd, &readfds)) { ... }
}

瓶颈3:重复拷贝

fd_set readfds = master_readfds;  // 每次循环都复制整个位图!
select(max_fd + 1, &readfds, ...);  // 再拷贝到内核

Select的适用场景

  • 跨平台兼容:Windows支持Select,不支持Epoll
  • 连接数<100:这时候瓶颈不明显,代码简单
  • 教学理解:弄懂Select,才能理解Epoll为什么设计成这样

三、Epoll网络模型:生产环境的唯一选择

Epoll是Linux 2.6引入的(2002年),彻底解决了Select的三个瓶颈。它不是"Select的升级版",而是完全不同的设计哲学。

核心差异:从"轮询"到"事件回调"

SelectEpoll
告诉内核什么"帮我盯着这1024个fd,每次告诉你一遍""这个fd给我盯着,有事通知我"(只注册一次)
内核怎么通知返回整个位图,你自己遍历找只返回"有事的fd列表"
复杂度O(max_fd)O(就绪数量)
最大fd1024(默认)系统内存上限
数据拷贝每次调用都拷贝注册时拷贝一次,用共享内存

Epoll的三个核心API

#include <sys/epoll.h>

// 1. 创建epoll实例
int epoll_fd = epoll_create1(0);  // 参数传0即可,旧版用epoll_create(1024)但现在参数被忽略

// 2. 注册/修改/删除监控的fd
struct epoll_event ev;
ev.events = EPOLLIN;      // 监控"可读"事件
                          // EPOLLOUT=可写, EPOLLERR=错误, EPOLLHUP=断开
                          // EPOLLET=边缘触发(后面讲)
ev.data.fd = client_fd;   // 用户数据,epoll返回时原样带回

epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);   // 添加
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client_fd, &ev);   // 修改
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr); // 删除

// 3. 等待事件
struct epoll_event events[1024];
int ready_count = epoll_wait(epoll_fd, events, 1024, -1);  // -1=永久阻塞,0=立即返回,>0=毫秒超时

// 返回后,events[0..ready_count-1]就是所有就绪的fd
for (int i = 0; i < ready_count; ++i) {
    int fd = events[i].data.fd;
    uint32_t ev_flags = events[i].events;
    // 处理fd的事件...
}

Epoll服务端完整代码

#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <cstring>
#include <iostream>

int set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    // ========== 创建监听socket ==========
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    set_nonblocking(listen_fd);
    
    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    
    struct sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8888);
    server_addr.sin_addr.s_addr = INADDR_ANY;
    
    bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    listen(listen_fd, 128);
    
    std::cout << "Epoll Server on 0.0.0.0:8888\n";
    
    // ========== 创建epoll实例 ==========
    int epoll_fd = epoll_create1(0);
    if (epoll_fd < 0) {
        perror("epoll_create1 failed");
        return -1;
    }
    
    // 把监听socket加入epoll
    struct epoll_event ev{};
    ev.events = EPOLLIN;
    ev.data.fd = listen_fd;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);
    
    // ========== 事件循环 ==========
    struct epoll_event events[1024];
    const int MAX_EVENTS = 1024;
    
    while (true) {
        // 等待事件,-1表示永久阻塞直到有事件
        int ready_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        
        if (ready_count < 0) {
            if (errno == EINTR) continue;
            perror("epoll_wait failed");
            break;
        }
        
        // 只遍历"有事的"fd,最多1024个
        for (int i = 0; i < ready_count; ++i) {
            int fd = events[i].data.fd;
            uint32_t ev = events[i].events;
            
            // 错误事件
            if (ev & (EPOLLERR | EPOLLHUP)) {
                std::cout << "fd " << fd << " error/hup, closing\n";
                close(fd);
                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                continue;
            }
            
            // ========== 监听socket:新连接 ==========
            if (fd == listen_fd) {
                while (true) {  // 边缘触发模式下要accept到EAGAIN为止
                    struct sockaddr_in client_addr{};
                    socklen_t len = sizeof(client_addr);
                    int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &len);
                    
                    if (client_fd < 0) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            break;  // 没有更多连接了
                        }
                        perror("accept failed");
                        break;
                    }
                    
                    set_nonblocking(client_fd);
                    
                    struct epoll_event client_ev{};
                    client_ev.events = EPOLLIN | EPOLLET;  // 边缘触发!
                    client_ev.data.fd = client_fd;
                    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &client_ev);
                    
                    std::cout << "New client: " << client_fd << "\n";
                }
            }
            // ========== 客户端socket:有数据 ==========
            else if (ev & EPOLLIN) {
                char buffer[1024];
                int recv_len = recv(fd, buffer, sizeof(buffer) - 1, 0);
                
                if (recv_len > 0) {
                    buffer[recv_len] = '\0';
                    std::cout << "From " << fd << ": " << buffer << "\n";
                    send(fd, buffer, recv_len, 0);  // echo
                } else if (recv_len == 0) {
                    // 客户端主动关闭
                    std::cout << "Client " << fd << " disconnected\n";
                    close(fd);
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
                    // 真正的错误
                    perror("recv error");
                    close(fd);
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                }
            }
        }
    }
    
    close(epoll_fd);
    close(listen_fd);
    return 0;
}

水平触发(LT)vs 边缘触发(ET)

这是Epoll最容易踩的坑。

flowchart LR
    subgraph LT["水平触发 Level Trigger"]
        A1[fd有数据] --"epoll_wait返回"--> B1[你只读了部分]
        B1 --"再调epoll_wait"--> C1[又返回!直到数据读完"]
        style C1 fill:#ffcccc
    end
    
    subgraph ET["边缘触发 Edge Trigger"]
        A2[fd有数据] --"epoll_wait返回一次"--> B2[你必须读完]
        B2 --"再调epoll_wait"--> C2[不返回!直到新数据来"]
        style B2 fill:#ccffcc
    end
水平触发 (LT)边缘触发 (ET)
触发条件fd有未读数据 → 一直报告数据"从无到有"或"从少到多" → 只报告一次
代码要求简单,读不读完无所谓必须把数据读完,否则丢失事件
性能可能重复触发触发次数最少
配合模式阻塞或非阻塞都可以必须非阻塞(否则可能在read中阻塞)

ET的陷阱:如果你recv()一次没读完缓冲区所有数据,epoll不会再次通知你——它觉得"我已经通知过了,你没读完是你的事"。

解决方案:ET模式下,收到EPOLLIN后,循环recv()直到返回EAGAIN

// ET模式下的正确读法
if (ev & EPOLLIN) {
    while (true) {
        int recv_len = recv(fd, buffer, sizeof(buffer), 0);
        if (recv_len > 0) {
            // 处理数据
        } else if (recv_len < 0 && errno == EAGAIN) {
            break;  // 读完了,真正的缓冲区空
        } else {
            // 断开或错误
            close(fd);
            break;
        }
    }
}

生产环境一般用ET + 非阻塞,因为触发次数最少,性能最高。但代价是代码更复杂,一不注意就丢事件。

Epoll的数据结构内幕(简化版)

Epoll为什么快?因为它在内核里用了红黑树 + 就绪链表:

用户空间                内核空间
   │                       │
   │  epoll_ctl(ADD, fd)   │
   │───────────────────────▶│
   │                       │    ┌─────────────────┐
   │                       │    │   红黑树 (RBTree) │
   │                       │    │  存储所有监控的fd  │
   │                       │    │  key=fd, value=epoll_event │
   │                       │    └─────────────────┘
   │                       │              │
   │  epoll_wait()         │              ▼
   │──────────────────────▶│    ┌─────────────────┐
   │                       │    │   就绪链表        │
   │◄──────────────────────│    │  网卡中断时,   │
   │  返回events[]         │    │  有数据的fd插到这里│
   │  (只含就绪的)        │    └─────────────────┘
  • 红黑树:存储所有被epoll监控的fd,增删查都是O(log n)。注册一次,长期有效。
  • 就绪链表:网卡收到数据 → 内核中断处理 → 把对应fd插到就绪链表。epoll_wait()只从链表取数据,不需要遍历所有fd。

这就是"O(就绪数量)"复杂度的来源。


四、Buffer设计:网络层的核心数据结构

TCP是字节流,没有消息边界。recv()可能给你半条消息、一条消息、或者一条半消息。Buffer的作用就是"攒数据,凑够一条完整消息再上交"。

一个最简单的Buffer

#include <vector>
#include <cstring>
#include <algorithm>

class Buffer {
public:
    static const size_t INITIAL_SIZE = 1024;
    
    Buffer() : buffer_(INITIAL_SIZE), read_index_(0), write_index_(0) {}
    
    // 可写字节数
    size_t writable_bytes() const {
        return buffer_.size() - write_index_;
    }
    
    // 可读字节数
    size_t readable_bytes() const {
        return write_index_ - read_index_;
    }
    
    // 写入数据(从recv来的)
    void append(const char* data, size_t len) {
        ensure_writable(len);
        memcpy(begin_write(), data, len);
        write_index_ += len;
    }
    
    // 读取数据(交给上层解析)
    const char* peek() const {
        return begin() + read_index_;
    }
    
    void retrieve(size_t len) {
        if (len < readable_bytes()) {
            read_index_ += len;
        } else {
            retrieve_all();
        }
    }
    
    void retrieve_all() {
        read_index_ = 0;
        write_index_ = 0;
    }
    
    // 获取可写位置的指针(给recv用)
    char* begin_write() {
        return begin() + write_index_;
    }
    
private:
    char* begin() { return buffer_.data(); }
    const char* begin() const { return buffer_.data(); }
    
    void ensure_writable(size_t len) {
        if (writable_bytes() < len) {
            // 如果有废弃空间,先整理
            if (read_index_ > 0) {
                size_t readable = readable_bytes();
                memmove(begin(), peek(), readable);
                read_index_ = 0;
                write_index_ = readable;
            }
            // 还不够就扩容
            if (writable_bytes() < len) {
                buffer_.resize(write_index_ + len);
            }
        }
    }
    
    std::vector<char> buffer_;
    size_t read_index_;   // 读指针
    size_t write_index_;  // 写指针
};

Buffer使用流程

sequenceDiagram
    participant Net as 网络层
    participant Buf as Buffer
    participant Parser as Packet解析器
    participant Logic as 业务逻辑
    
    Net->>Buf: recv() 得到 "[HALF_MSG_1]"
    Buf->>Buf: append(data, len)
    Note over Buf: 可读: 半条消息
    
    Net->>Buf: recv() 得到 "[HALF_MSG_1_END][MSG_2]"
    Buf->>Buf: append(data, len)
    Note over Buf: 可读: 一条半消息
    
    Parser->>Buf: peek() 检查长度字段
    Parser->>Buf: retrieve(msg1_len) 取走第一条
    Parser->>Logic: 解析后的消息对象
    
    Parser->>Buf: 检查剩余数据
    Note over Buf: 还有MSG_2的一半
    
    Net->>Buf: recv() 得到 "[MSG_2_REST]"
    Buf->>Buf: append
    Parser->>Buf: 现在MSG_2完整了,retrieve取走

生产级Buffer的考虑

  1. 内存池:频繁new/delete会碎片化。游戏服一般用定长内存池或ring buffer。
  2. 上限保护:恶意客户端不发完整消息头,Buffer无限增长。要设上限,超了就踢。
  3. 零拷贝:Linux有splice()sendfile(),数据不经过用户空间。但游戏协议通常需要解析,零拷贝场景少。
  4. 双Buffer:读Buffer + 写Buffer分离,避免读写竞争。

五、Packet设计:给字节流加上"边界"

Buffer解决"攒数据",Packet解决"怎么知道一条消息多长"。

最常见的协议头设计:长度 + 类型 + payload

┌─────────┬─────────┬─────────────────────┐
│ 长度(2B) │ 类型(2B) │     Payload(n B)     │
│  uint16  │  uint16  │    protobuf/json/...  │
└─────────┴─────────┴─────────────────────┘
         ↑                              ↑
    总长度 = 4 + n                  实际业务数据
#pragma pack(push, 1)  // 按1字节对齐,不要填充
struct PacketHeader {
    uint16_t len;      // 总长度(头+payload)
    uint16_t msg_type; // 消息类型,比如1=登录,2=移动,3=攻击...
};
#pragma pack(pop)

const uint16_t HEADER_SIZE = sizeof(PacketHeader);

Packet解析流程

bool parse_packet(Buffer& buf, uint16_t& out_type, std::string& out_payload) {
    // 1. 检查是否有完整的头
    if (buf.readable_bytes() < HEADER_SIZE) {
        return false;  // 数据不够,等下一次recv
    }
    
    // 2. 读取头(注意字节序!网络序要转主机序)
    const PacketHeader* header = reinterpret_cast<const PacketHeader*>(buf.peek());
    uint16_t total_len = ntohs(header->len);
    uint16_t msg_type = ntohs(header->msg_type);
    
    // 3. 检查是否有完整的payload
    if (buf.readable_bytes() < total_len) {
        return false;  // payload还没全到
    }
    
    // 4. 提取payload
    out_type = msg_type;
    out_payload.assign(buf.peek() + HEADER_SIZE, total_len - HEADER_SIZE);
    
    // 5. 从Buffer移除这条消息
    buf.retrieve(total_len);
    
    return true;
}

常见协议头变体

类型格式适用场景
固定长度4字节头(2B长度+2B类型)简单,最常用
分隔符\r\n\0分隔文本协议,如HTTP、Redis
自描述长度变长编码(如protobuf的varint)节省带宽
魔数+版本0xABCD魔数 + 版本号 + 长度防协议混淆,支持升级

六、protobuf协议定义与序列化

JSON对人类友好,对机器不友好——解析慢、体积大、类型弱。游戏服用protobuf是行业标配。

定义协议文件

// game.proto
syntax = "proto3";
package GameProto;

// 登录请求
message LoginReq {
    string account = 1;    // 账号
    string password = 2;   // 密码(实际应该是token或签名)
    int32  platform = 3;   // 平台:1=iOS, 2=Android, 3=PC
}

// 登录响应
message LoginResp {
    int32  result = 1;     // 0=成功,其他=错误码
    int64  player_id = 2;  // 玩家唯一ID
    string token = 3;      // 会话token
}

// 移动请求
message MoveReq {
    int32  scene_id = 1;   // 场景ID
    float  x = 2;          // 目标X坐标
    float  y = 3;          // 目标Y坐标
    float  z = 4;          // 目标Z坐标
}

// 移动广播(服务端推给周围玩家)
message MoveNotify {
    int64  player_id = 1;
    float  x = 2;
    float  y = 3;
    float  z = 4;
    int64  timestamp = 5;  // 服务器时间戳,用于校验和排序
}

C++使用protobuf

#include "game.pb.h"  // protoc生成的头文件

// ===== 序列化:对象 → 字节流 =====
GameProto::LoginReq req;
req.set_account("player_123");
req.set_password("hashed_token_here");
req.set_platform(2);

// 序列化到string
std::string serialized;
req.SerializeToString(&serialized);

// 封包:长度 + 类型 + protobuf payload
uint16_t msg_type = 1001;  // 假设1001是LoginReq
uint16_t total_len = HEADER_SIZE + serialized.size();

char packet[4096];
PacketHeader header;
header.len = htons(total_len);
header.msg_type = htons(msg_type);
memcpy(packet, &header, HEADER_SIZE);
memcpy(packet + HEADER_SIZE, serialized.data(), serialized.size());

send(client_fd, packet, total_len, 0);

// ===== 反序列化:字节流 → 对象 =====
// 假设buf已经通过Buffer和Packet解析拿到了payload
std::string payload = ...;  // 从Buffer提取的protobuf二进制数据

GameProto::LoginReq received_req;
if (received_req.ParseFromString(payload)) {
    std::cout << "Account: " << received_req.account() << "\n";
    std::cout << "Platform: " << received_req.platform() << "\n";
}

protobuf的优势

优势说明
紧凑变长编码,数字越小占字节越少
快速二进制解析,比JSON快10-100倍
类型安全编译期检查字段类型
向后兼容新增字段不破坏旧代码
多语言自动生成C++/Java/Python/Go…

工程建议

  1. 版本管理:proto文件是接口契约,改动要慎重。建议用Git管理,重大变更发版本号。
  2. 字段编号不要复用:protobuf靠字段编号识别字段,删掉一个字段后别复用它的编号——旧代码会误解。
  3. optional和repeated:proto3默认所有字段都是optional(不用显式标),但repeated字段要显式声明。
  4. 不要直接传密码:示例里的password字段只是为了演示。真实游戏里,客户端应该传登录token(比如SDK校验后的session),服务端永远不要信任客户端发的密码。

七、全书libserver框架的Network基类初探

彭放自研的libserver框架封装了Select和Epoll的差异。本章末尾,他把前面的所有概念整合成一个可扩展的架构:

libserver网络层架构(简化)

Network.h/cpp          # 纯虚基类,定义接口
├── NetworkSelect.h    # Select实现
└── NetworkEpoll.h     # Epoll实现

NetworkListen.h        # 监听端口
NetworkConnector.h     # 主动连接(连别的服)

Buffer.h               # 收发缓冲区
Packet.h               # 封包/解包

MessageQueue.h         # 网络层和业务层的消息队列

关键设计:网络线程和业务线程分离

flowchart LR
    subgraph NetThread["网络线程"]
        A1[epoll_wait] --"收数据"--> B1[Buffer]
        B1 --"凑够一条消息"--> C1[Packet解析]
        C1 --"放入队列"--> D1[MsgQueue]
    end
    
    subgraph BizThread["业务线程"]
        D1 --"取出消息"--> E1[业务逻辑处理]
        E1 --"生成回复"--> F1[MsgQueue]
    end
    
    subgraph NetThread2["网络线程"]
        F1 --"取出回复"--> G1[Packet序列化]
        G1 --"写入Buffer"--> H1[send]
    end

为什么要分离?

因为epoll_wait()是阻塞的(或带超时的),业务逻辑可能也是阻塞的(比如查数据库)。如果混在一起,一个玩家"发了条消息触发数据库查询",会导致整个epoll循环卡住,其他玩家都收不到消息。

分离后的原则:

  • 网络线程只管"收发数据"和"协议解析",不做业务
  • 业务线程从队列取消息,处理后塞回复到另一个队列
  • 队列是唯一的共享资源,用无锁队列或单生产者单消费者避免锁竞争

八、工程建议与常见坑点

✅ 做对的事

  1. 监听socket也用非阻塞+ET。高并发时,连接蜂拥而至,LT模式下可能触发多次。ET模式下一次accept要循环到EAGAIN,确保全接完。

  2. 设置SO_KEEPALIVE。TCP自带保活机制,但默认间隔太长(2小时)。游戏服应该在应用层做心跳,更快发现问题。

  3. 处理EINTRepoll_wait()可能被信号中断,返回-1且errno==EINTR。要重试,不能当错误退出。

  4. 控制epoll_wait的maxevents。1024够用吗?如果你预期单线程处理1万并发,设成maxevents = 10000。返回的数组要够大。

  5. 单线程Reactor vs 多线程Reactor。单线程epoll适合CPU密集度不高的场景(如聊天服)。游戏战斗服逻辑重,可以用"一个线程epoll收数据 + 多个线程处理业务"(多Reactors)。

❌ 常见的坑

  1. ET模式下没读完数据。这是Epoll ET的头号杀手bug。表现是:客户端发了消息,服务端不触发——因为上一次触发时你没读完,epoll认为"已经通知过了"。

  2. 在epoll事件处理里做阻塞操作。比如收到登录消息,立刻查MySQL验证密码。这条连接的处理被卡住了,但更糟糕的是——如果你是用单线程Reactor,所有连接都跟着卡

  3. 忘记处理EPOLLOUT。非阻塞模式下,如果发送缓冲区满,send()会返回EAGAIN。这时候要注册EPOLLOUT事件,等内核通知"可写了"再发。很多示例代码为了简单忽略了这点,但生产环境不能忽略。

  4. fd泄漏close(fd)忘了epoll_ctl(DEL),或者反过来。epoll会监控一个已关闭的fd吗?不会,但你的数据结构里可能还存着引用,导致逻辑混乱。

  5. 大小端问题。x86是小端,网络协议是大端。所有多字节字段发出去之前都要htons()/htonl(),收到之后都要ntohs()/ntohl()。漏一次,调试到死。


九、Select vs Epoll:终极对比

flowchart TD
    subgraph SelectDetail["Select"]
        S1["O(max_fd) 扫描"] 
        S2["fd_set拷贝开销"]
        S3["默认1024限制"]
        S4["跨平台"]
    end
    
    subgraph EpollDetail["Epoll"]
        E1["O(就绪数) 事件驱动"]
        E2["mmap共享内存"]
        E3["无fd上限"]
        E4["Linux only"]
    end
    
    SelectDetail --> S5["适用: <100连接\n教学/兼容Windows"]
    EpollDetail --> E5["适用: >1000连接\n所有生产环境"]
场景推荐
学习理解IO多路复用原理Select
写跨平台代码(Win+Linux)Select(或libevent/libuv封装)
生产环境Linux游戏服Epoll,没有例外
10万+并发Epoll + 多线程 + 多进程

十、本章核心回顾

  1. IO多路复用的目的:避免轮询所有socket,只处理"有事的"
  2. Select:位图+线性扫描,教学用,生产环境别用
  3. Epoll:红黑树+就绪链表,O(就绪数),生产唯一选择
  4. LT vs ET:ET性能高但容易丢事件,必须循环读到EAGAIN
  5. Buffer:解决TCP字节流没有消息边界的问题
  6. Packet:给消息加头(长度+类型),让Buffer知道"凑够多少算一条"
  7. protobuf:游戏协议的标配,紧凑、快速、类型安全
  8. 网络线程和业务线程必须分离:epoll循环不能被业务阻塞

十一、进阶作业

基于本章代码,实现一个功能完整的Echo服务器升级:

  1. 用Epoll ET模式,正确处理accept和recv的循环
  2. 实现Buffer + Packet层,协议头用2B长度 + 2B类型
  3. 用protobuf定义消息:LoginReq/LoginResp/ChatMsg
  4. 支持多客户端:每个客户端分配player_id,聊天消息广播给所有人
  5. 添加心跳检测:30秒没收到心跳就踢掉

这个作业的代码量大约是本章示例的3-5倍,但完成后你就拥有了一个"可以写进简历"的雏形游戏聊天服务器。


Select到Epoll的演进,本质上是"从’我查你’到’你通知我’的权力转移。
这个转移在游戏服务器里至关重要——因为你要查的不是10个fd,是10万个。
当你开始用Epoll,才终于可以说:"这台服务器, ready for production."

"能处理100个连接的叫demo,能处理10万个连接的叫产品。" 🖤