mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
645 字
2 分钟
Redis 请求处理流程:从命令到响应
2023-12-01

前言#

Redis 以其极高的性能著称,单线程却能处理 10 万+ QPS。本文深入剖析 Redis 的请求处理流程,揭示其高性能的设计原理。

Redis 架构概览#

flowchart TB subgraph 客户端 C1[Client 1] C2[Client 2] C3[Client N] end subgraph Redis Server A[事件循环<br/>Event Loop] --> B[IO 多路复用] B --> C[文件事件处理器] C --> D[命令解析器] D --> E[命令执行器] E --> F[数据库] E --> G[响应构建器] end C1 --> B C2 --> B C3 --> B

一、事件驱动模型#

1.1 事件循环#

Redis 使用事件循环处理所有事件:

// Redis 事件循环(简化)
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 1. 处理到期的定时事件
aeProcessTimeEvents(eventLoop);
// 2. 等待并处理文件事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
flowchart TB A[事件循环] --> B{有文件事件?} B -->|是| C[处理文件事件] B -->|否| D[处理定时事件] C --> E[执行事件处理器] D --> E E --> F{还有事件?} F -->|是| B F -->|否| G[等待新事件] G --> B

1.2 IO 多路复用#

Redis 支持多种 IO 多路复用实现:

// 根据平台选择最优实现
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
实现平台特点
epollLinux高性能,支持大量连接
kqueueBSD/macOS高性能
evportSolaris高性能
select跨平台性能较差,连接数限制

1.3 epoll 示例#

// epoll API
int epfd = epoll_create(1024); // 创建 epoll 实例
// 添加监听事件
struct epoll_event ee;
ee.events = EPOLLIN; // 监听可读事件
ee.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ee);
// 等待事件
struct epoll_event events[1024];
int n = epoll_wait(epfd, events, 1024, timeout);
for (int i = 0; i < n; i++) {
// 处理就绪的 fd
handleEvent(events[i]);
}

二、文件事件处理器#

2.1 事件类型#

// 文件事件类型
#define AE_READABLE 1 // 可读
#define AE_WRITABLE 2 // 可写
// 事件处理器结构
typedef struct aeFileEvent {
int mask; // 事件类型掩码
aeFileProc *rfileProc; // 读事件处理器
aeFileProc *wfileProc; // 写事件处理器
void *clientData; // 客户端数据
} aeFileEvent;

2.2 连接建立流程#

sequenceDiagram participant C as Client participant S as Server participant A as Accept Handler participant R as Read Handler Note over S: 监听端口 6379 C->>S: TCP 连接请求 S->>A: 触发可读事件 A->>A: accept() 接受连接 A->>A: 创建 client 结构 A->>S: 注册读事件处理器 Note over S: 等待客户端命令 C->>S: 发送命令数据 S->>R: 触发可读事件 R->>R: read() 读取数据 R->>R: 解析命令 R->>R: 执行命令 R->>R: 发送响应

2.3 客户端结构#

// Redis 客户端结构(简化)
typedef struct client {
int fd; // 套接字描述符
redisDb *db; // 当前数据库
robj *name; // 客户端名称
sds querybuf; // 查询缓冲区
int argc; // 参数个数
robj **argv; // 参数数组
struct redisCommand *cmd; // 要执行的命令
// 输出缓冲区
int bufpos; // 缓冲区位置
char buf[PROTO_REPLY_CHUNK_BYTES]; // 固定缓冲区
list *reply; // 可变缓冲区列表
// 状态标志
int flags;
// 认证信息
int authenticated;
// 事务
multiState mstate;
// 订阅
dict *pubsub_channels;
list *pubsub_patterns;
} client;

三、命令解析#

3.1 RESP 协议#

Redis 使用 RESP(REdis Serialization Protocol)协议:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
解析为:
["SET", "key", "value"]

协议类型

类型标识说明示例
+简单字符串+OK\r\n
-错误-ERR unknown command\r\n
:整数:1000\r\n
$批量字符串$6\r\nfoobar\r\n
*数组*2\r\n3\nfoo˚\n˚3\r\nfoo\r\n3\r\nbar\r\n

3.2 解析流程#

flowchart TB A[接收数据] --> B[写入 querybuf] B --> C{解析行} C -->|数组| D[解析元素个数] D --> E[解析每个元素] E --> F{所有元素解析完?} F -->|否| C F -->|是| G[填充 argv] G --> H[准备执行]
// 解析命令(简化)
int processMultibulkBuffer(client *c) {
char *newline = NULL;
int pos = 0, ok;
long ll;
// 解析数组长度 *<count>\r\n
if (c->multibulklen == 0) {
newline = strchr(c->querybuf, '\r');
ok = string2ll(c->querybuf + 1, newline - c->querybuf - 1, &ll);
c->multibulklen = ll;
c->argc = ll;
c->argv = zmalloc(sizeof(robj*) * c->argc);
pos = newline - c->querybuf + 2;
}
// 解析每个参数
while (c->multibulklen) {
// 解析参数长度 $<len>\r\n
// 解析参数数据 <data>\r\n
// ...
}
return C_OK;
}

3.3 命令查找#

// 命令表(哈希表)
struct redisCommand redisCommandTable[] = {
{"get", getCommand, 2, "r", 0, NULL, 1, 1, 1, 0, 0},
{"set", setCommand, -3, "wm", 0, NULL, 1, 1, 1, 0, 0},
{"del", delCommand, -2, "w", 0, NULL, 1, -1, 1, 0, 0},
// ...
};
// 查找命令
struct redisCommand *lookupCommand(sds name) {
return dictFetchValue(server.commands, name);
}

四、命令执行#

4.1 执行流程#

flowchart TB A[获取命令] --> B{命令存在?} B -->|否| C[返回错误] B -->|是| D{参数个数正确?} D -->|否| E[返回参数错误] D -->|是| F{有权限?} F -->|否| G[返回权限错误] F -->|是| H{内存限制?} H -->|是| I[内存淘汰] H -->|否| J[执行命令] I --> J J --> K[记录慢日志] K --> L[发送响应]

4.2 命令结构#

struct redisCommand {
char *name; // 命令名
redisCommandProc *proc; // 命令实现函数
int arity; // 参数个数,-N 表示至少 N 个
char *sflags; // 字符串标志
int flags; // 整数标志
redisGetKeysProc *getkeys_proc; // 获取键的函数
int firstkey; // 第一个键的位置
int lastkey; // 最后一个键的位置
int keystep; // 键的步长
long long microseconds, calls; // 统计信息
};

命令标志

标志说明
w写命令
r读命令
m可能改变内存使用
a管理员命令
pPub/Sub 命令
s不允许在 Lua 脚本中执行
R随机命令

4.3 SET 命令示例#

// SET 命令实现(简化)
void setCommand(client *c) {
robj *key = c->argv[1];
robj *val = c->argv[2];
// 解析选项:NX, XX, EX, PX
int flags = 0;
long long milliseconds = 0;
for (int j = 3; j < c->argc; j++) {
if (!strcasecmp(c->argv[j]->ptr, "nx")) {
flags |= SET_NX;
} else if (!strcasecmp(c->argv[j]->ptr, "xx")) {
flags |= SET_XX;
} else if (!strcasecmp(c->argv[j]->ptr, "ex")) {
// EX 秒
} else if (!strcasecmp(c->argv[j]->ptr, "px")) {
// PX 毫秒
}
}
// 设置键值
genericSetKey(c->db, key, val, flags);
// 设置过期时间
if (milliseconds) {
setExpire(c->db, key, mstime() + milliseconds);
}
// 发送响应
addReply(c, shared.ok);
}

4.4 内存淘汰#

// 内存淘汰策略
#define MAXMEMORY_FLAG_LRU (1<<0)
#define MAXMEMORY_FLAG_LFU (1<<1)
#define MAXMEMORY_FLAG_ALLKEYS (1<<2)
// 淘汰策略
typedef enum {
MAXMEMORY_VOLATILE_LRU,
MAXMEMORY_VOLATILE_LFU,
MAXMEMORY_VOLATILE_TTL,
MAXMEMORY_VOLATILE_RANDOM,
MAXMEMORY_ALLKEYS_LRU,
MAXMEMORY_ALLKEYS_LFU,
MAXMEMORY_ALLKEYS_RANDOM,
MAXMEMORY_NO_EVICTION
} maxmemory_policy_t;
// 执行淘汰
int freeMemoryIfNeeded(void) {
while (server.maxmemory &&
zmalloc_used_memory() > server.maxmemory) {
// 根据策略选择要删除的键
// 删除键
}
return C_OK;
}

五、响应构建#

5.1 输出缓冲区#

flowchart LR A[命令执行] --> B{响应大小} B -->|小| C[固定缓冲区<br/>16KB] B -->|大| D[可变缓冲区<br/>链表] C --> E[等待发送] D --> E
// 添加响应
void addReply(client *c, robj *obj) {
// 检查是否需要安装写处理器
if (prepareClientToWrite(c) != C_OK) return;
// 根据响应大小选择缓冲区
if (sdslen(obj->ptr) < PROTO_REPLY_CHUNK_BYTES) {
// 使用固定缓冲区
if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) {
// 失败则使用可变缓冲区
_addReplyStringToList(c, obj->ptr, sdslen(obj->ptr));
}
} else {
// 使用可变缓冲区
_addReplyStringToList(c, obj->ptr, sdslen(obj->ptr));
}
}

5.2 响应发送#

// 发送响应
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = privdata;
int nwritten = 0, totwritten = 0;
while (c->bufpos > 0 || listLength(c->reply)) {
// 发送固定缓冲区
if (c->bufpos > 0) {
nwritten = write(fd, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
if (c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
}
// 发送可变缓冲区
if (listLength(c->reply)) {
// ...
}
}
// 所有数据发送完毕,移除写处理器
if (c->bufpos == 0 && listLength(c->reply) == 0) {
aeDeleteFileEvent(el, c->fd, AE_WRITABLE);
}
}

六、单线程模型#

6.1 为什么单线程如此快#

flowchart TB A[高性能原因] --> B[纯内存操作] A --> C[IO 多路复用] A --> D[非阻塞 IO] A --> E[无锁设计] A --> F[简单数据结构]
因素说明
纯内存磁盘 IO 是最大瓶颈,Redis 数据在内存中
IO 多路复用单线程处理多个连接,无上下文切换
非阻塞 IO不会因单个慢客户端阻塞其他客户端
单线程无锁竞争、无上下文切换
高效数据结构SDS、跳表、整数集合等

6.2 单线程的限制#

flowchart LR A[慢命令] --> B[阻塞事件循环] B --> C[影响所有客户端] D[大键操作] --> E[内存重分配] E --> F[延迟增加]

避免的操作

# 危险操作
KEYS * # O(N),全量扫描
HGETALL bigkey # 大键,内存重分配
DEL bigkey # 删除大键,阻塞

安全替代

# 安全替代
SCAN 0 MATCH pattern COUNT 100 # 分批扫描
HSCAN key 0 COUNT 100 # 分批获取
UNLINK bigkey # 异步删除

七、多线程扩展#

7.1 Redis 6.0 多线程 IO#

Redis 6.0 引入多线程处理网络 IO:

flowchart TB subgraph 主线程 A[事件循环] --> B[命令解析] B --> C[命令执行] C --> D[响应构建] end subgraph IO 线程池 E[线程 1: 读取数据] F[线程 2: 读取数据] G[线程 N: 读取数据] H[线程 1: 发送数据] I[线程 2: 发送数据] J[线程 N: 发送数据] end E --> B F --> B G --> B D --> H D --> I D --> J

配置

# redis.conf
io-threads 4
io-threads-do-reads yes

7.2 多线程读写流程#

// 多线程读取
int handleClientsWithPendingReadsUsingThreads(void) {
// 1. 将客户端分配给 IO 线程
listRewind(server.clients_pending_read, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int item_id = item_id++ % server.io_threads_num;
listAddNodeTail(io_threads_list[item_id], c);
}
// 2. 唤醒 IO 线程
for (int j = 1; j < server.io_threads_num; j++) {
int processed = listLength(io_threads_list[j]);
io_threads_pending[j] = processed;
}
// 3. 主线程也处理一部分
while (listLength(io_threads_list[0])) {
client *c = listFirst(io_threads_list[0])->value;
readQueryFromClient(NULL, c->fd, c, 0);
}
// 4. 等待所有 IO 线程完成
while (1) {
int pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (!pending) break;
}
return processed;
}

八、持久化#

8.1 RDB 快照#

sequenceDiagram participant C as Client participant M as 主进程 participant S as 子进程 participant D as 磁盘 C->>M: BGSAVE M->>M: fork() 创建子进程 Note over M: 主进程继续服务 S->>S: 遍历数据库 S->>D: 写入 RDB 文件 D-->>M: 写入完成信号 M-->>C: BGSAVE 成功

8.2 AOF 日志#

flowchart TB A[执行命令] --> B[追加到 AOF 缓冲区] B --> C{刷盘策略} C -->|always| D[每次写入都刷盘] C -->|everysec| E[每秒刷盘] C -->|no| F[由操作系统决定] G[AOF 重写] --> H[fork 子进程] H --> I[重写数据库状态] I --> J[替换旧 AOF]

AOF 配置

# redis.conf
appendonly yes
appendfsync everysec
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

九、性能监控#

9.1 慢查询日志#

# 配置慢查询
CONFIG SET slowlog-log-slower-than 10000 # 10 毫秒
CONFIG SET slowlog-max-len 128
# 查看慢查询
SLOWLOG GET 10

9.2 延迟监控#

# 开启延迟监控
CONFIG SET latency-monitor-threshold 100
# 查看延迟事件
LATENCY LATEST
LATENCY HISTORY command

9.3 INFO 命令#

# 服务器信息
INFO server
# 内存信息
INFO memory
# 统计信息
INFO stats
# 客户端信息
INFO clients

总结#

Redis 请求处理完整流程#

flowchart TB A[客户端发送命令] --> B[IO 多路复用检测] B --> C[触发可读事件] C --> D[读取数据到 querybuf] D --> E[解析 RESP 协议] E --> F[查找命令] F --> G[执行命令] G --> H[构建响应] H --> I[写入输出缓冲区] I --> J[触发可写事件] J --> K[发送响应到客户端]

关键要点#

  1. 事件驱动:单线程事件循环处理所有事件
  2. IO 多路复用:epoll/kqueue 高效处理并发连接
  3. RESP 协议:简单高效的序列化协议
  4. 单线程执行:无锁竞争,命令顺序执行
  5. 多线程 IO:Redis 6.0+ 网络读写多线程化

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Redis 请求处理流程:从命令到响应
https://blog.souloss.com/posts/principles/redis-request-handling-process/
作者
Souloss
发布于
2023-12-01
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时