文一,应急管理大学在读本科生,中国 PostgreSQL 分会实训基地秘书长,360 基础架构部 pikiwidb.com 设计者,PikiwiDB 与 IvorySQL 代码贡献者,MOP 社区会员,《数据库红皮书》(线上读物)译者,曾经在中国科学院,北京航空航天大学等地展开 PostgreSQL 内核相关的技术分享工作。
事务,是数据库领域中的重中之重,是一种对于 ACID(原子性、一致性、隔离性、持久性)的具体实现方案,在 Redis 中,我们可以通过 MULTI
(发起一个事务)、EXEC
(提交并执行一个事务)、DISCARD
(取消一个事务)、WATCH
(观察某个元素的变化)等指令指导事务模块展开工作,就比如:
MULTI
SET Key_2021 "Hello World 2021"
SET Key_2022 "Hello World 2022"
Get Key_2022
SET Key_2023 "Hello World 2023"
SET Key_2024 "Hello World 2024"
EXEC
具体的执行结果如下:

可以发现:
在 Redis 事务中的指令,并不会被立刻执行,他们会被放置于一个队列之中,待到
EXEC
指令发起之后,才会逐条执行(这就和 PostgreSQL 是不一致的,在 PostgreSQL 的基于 MVCC 的事务模型之中,同一份数据会根据时间顺序,即事务 ID 的先后划分出多个不同的版本,并最终根据隔离级别的不同,决定是否呈现给某个事务,也正是因为如此,倘若我们在 PostgreSQL 修改某一个数据项,在隔离级别允许以及其它事务没有冲突性提交的情况下,其结果会立刻呈现出来)无论是数据的读取还是数据的写入,其得到的结果都会是
QUEUED
,即指令已经被放入队列之中(甚至于只是回显用户输入的 ECHO 指令,也都不能够立即反馈,也都需要排入队列才可以交付执行)
而我们建立了这种了解之后,就可以展开具体的进一步的工作,首先,正如同我们在《Redis 的指令表实现机制简介》(https://datapromoto.atomgit.com/explore/journalism/detail/358934497930121216)所介绍的那样,所有的 Redis 指令,实际上就对应着一个对接的C语言函数,因此,理解事务指令所对应的C语言函数的具体行为,就可以帮助我们,建立对于 Redis 的事务的更为积极的了解。
具体分析:Redis MULTI 指令
根据 Redis 指令的命名规律(指令名称小写配合 Command 后缀,往往就是对于某种指令的C语言对接函数),我们可以非常顺利地找到 multiCommand
这个函数,它就是实现 MULTI
指令的对接函数,其代码参考如下:
/* 代码来源于 src/multi.c */
void multiCommand(client *c) {
/*
如果用于抽象化客户端的对象已经拥有事务标记,
说明已经处于事务状态中,
报错并直接返回即可
*/
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
/* 如果没有事务标记,则加上,并汇报成功 */
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}
可以发现,MULTI
指令的核心,便是加上事务标记,除此之外没有展开其它的工作,但是这样子,读者难免疑惑,因为只是通过位运算加上标记,是不可能做到让其它指令在事务状态时,选择进入指令队列,而不是直接交付执行的。
这就说明,我们还需要结合其它方面的逻辑,才可以把这个问题说明清白,因此,我们将把目光聚焦于 processCommand
上面,它是 Redis 中负责处理指令执行的函数,参考下面的内容:
/*
代码来源于 src/server.c
根据实际情况做了修订
*/
int processCommand(client *c) {
// ...
/* 判断客户端是否处于事务状态之中,且待执行指令非事务相关 */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand)
{
/* 处于事务状态之中,则将指令放入队列,并通报用户 */
queueMultiCommand(c, cmd_flags);
addReply(c,shared.queued);
} else {
/* 并非处于事务状态之中,则直接执行指令 */
int flags = CMD_CALL_FULL;
if (client_reprocessing_command) flags |= CMD_CALL_REPROCESSING;
call(c,flags);
// ...
}
// ...
}
而当我们把目光聚焦于 queueMultiCommand
上:
/*
来源于 src/multi.c
负责将指令放置进入到指令队列之中
*/
void queueMultiCommand(client *c, uint64_t cmd_flags) {
multiCmd *mc;
/* 当 Watch 指定的数据键或者事务本身出错的时候,直接返回 */
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
return;
/*
客户端对象存储着事务队列指令的数量
在初始情况下,即没有指令,数量为 0 的情况下
Redis 会将其直接设置为 2
因为假定要原子化的指令至少有两条
(这是非常符合逻辑的,因为单条指令本身就是很难见缝插针的)
*/
if (c->mstate.count == 0) {
c->mstate.commands = zmalloc(sizeof(multiCmd)*2);
c->mstate.alloc_count = 2;
}
/*
如果超出容量限制,按照2的倍数进行容量拓展
*/
if (c->mstate.count == c->mstate.alloc_count) {
c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count));
}
/*
最终扫尾的工作,因为队列指令将会被缓存到抽象化的客户端对象之中
因此这里做的就是更新统计数据
*/
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = c->argv;
mc->argv_len = c->argv_len;
c->mstate.count++;
c->mstate.cmd_flags |= cmd_flags;
c->mstate.cmd_inv_flags |= ~cmd_flags;
c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;
/*
清理客户端传入的 argc, argv
因为已经存储进入 mstate,即事务队列之中
*/
c->argv = NULL;
c->argc = 0;
c->argv_len_sum = 0;
c->argv_len = 0;
}
就可以发现,Redis 的事务实现,实际上非常清晰,就是客户端在服务端做一个登记,表示自身进入事务处理状态,随后服务端方面就开始对后续传入的指令展开进一步的记录工作。
这就引出了我们对于 EXEC
指令的分析工作,参考下面的内容。
具体分析 Redis EXEC 指令
/* src/multi.c */
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc, orig_argv_len;
struct redisCommand *orig_cmd;
/* 客户端并没有处于事务状态下,返回 */
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
/* 监视的数据键过期,状态变化,加上相关状态,后续停止事务执行 */
if (isWatchedKeyExpired(c)) {
c->flags |= (CLIENT_DIRTY_CAS);
}
/* 在 Watch 的数据键发生改变,事务内指令出现错误时,不执行事务 */
if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
if (c->flags & CLIENT_DIRTY_EXEC) {
addReplyErrorObject(c, shared.execaborterr);
} else {
addReply(c, shared.nullarray[c->resp]);
}
discardTransaction(c);
return;
}
uint64_t old_flags = c->flags;
/*
Redis 希望事务是短平快的,因此不会允许长期阻塞执行的指令存在,
这点和 PostgreSQL 是很不一样的
*/
c->flags |= CLIENT_DENY_BLOCKING;
/* 取消客户端所有与 Watch 相关的数据键的监视状态 */
unwatchAllKeys(c);
/* 进行标记,代表 Redis 服务端在做执行工作 */
server.in_exec = 1;
orig_argv = c->argv;
orig_argv_len = c->argv_len;
orig_argc = c->argc;
orig_cmd = c->cmd;
/* 根据事务队列中存储的指令数量,初步构建反馈结果 */
addReplyArrayLen(c,c->mstate.count);
/* 逐条遍历指令 */
for (j = 0; j < c->mstate.count; j++) {
// ...
/* 检测权限,因为 ACL 目前不在我们的关注范围内,故不讨论 */
int acl_errpos;
int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
if (acl_retval != ACL_OK) {
/* 不具有相关权限,则不执行 */
// ...
} else {
/* 不具有相关权限,则执行之 */
if (c->id == CLIENT_ID_AOF) {
/*
为 AOF 日志而准备,则不向子节点传输日志,
也不记录 SlowLog 和 内存使用状态
*/
call(c,CMD_CALL_NONE);
}
else {
/* 按照一般指令办法对待之 */
call(c,CMD_CALL_FULL);
}
// ...
}
// ...
}
/*
倘若旧有标志位并不阻挡长执行指令,则不阻挡,因为事务结束
又可以处理长周期指令了
*/
if (!(old_flags & CLIENT_DENY_BLOCKING))
c->flags &= ~CLIENT_DENY_BLOCKING;
/* 扫尾工作,宣告客户端和服务端的事务状态结束 */
c->argv = orig_argv;
c->argv_len = orig_argv_len;
c->argc = orig_argc;
c->cmd = c->realcmd = orig_cmd;
discardTransaction(c);
server.in_exec = 0;
}
具体分析 Redis WATCH 指令
这样,我们也就对 Redis 的事务实现建立了一个基本的了解,但是仅仅是这两条指令还是不够的,因为我们还并不清楚什么是 Watch
指令,没有理解 Redis 是如何监测数据项的变化的,因此,请让我们将目光聚焦于 watchCommand
之上,参考下面的代码:
void watchCommand(client *c) {
int j;
/* Watch 只能够在事务之外做执行 */
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
/*
如果已经有监测的数据键已经被修改,其它的随之无意义,
直接反馈 OK
*/
if (c->flags & CLIENT_DIRTY_CAS) {
addReply(c,shared.ok);
return;
}
/* 在服务端登记要监视的数据键,再反馈 OK */
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
之后,我们继续深入阅读 watchForKey
函数:
void watchForKey(client *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
/* 服务端需要掌握需要监视数据键的客户端的数量 */
if (listLength(c->watched_keys) == 0) server.watching_clients++;
/*
如果已经监视,则返回
客户端需要监测的数据键存储于客户端对象自身上
此处做的本质上就是列表遍历工作
*/
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return;
}
/*
Redis 按照 Database(命名空间)-Dict(动态哈希表)
的方式组织数据与管理数据
此处展开的就是登记工作
*/
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
/* wk 用于代指被监视的数据键 */
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->client = c;
wk->db = c->db;
wk->expired = keyIsExpired(c->db, key);
/* 增添引用计数,进入监视列表 */
incrRefCount(key);
listAddNodeTail(c->watched_keys, wk);
watchedKeyLinkToClients(clients, wk);
}
而被监视的数据键,检测其是否改变,对应的函数即为 signalModifiedKey
,它在 Redis 将会调用得到的地方很多,而它的策略也非常简单,只要将要修订的键是被监测的键,那么就算做修订行为发生。
因此,即使我们将数据设置的一模一样,也没有任何的意义,如下所示:

(事务照样执行失败)
由此,我们也就通过三个基本的指令,对于 Redis 的轻量级事务模型建立了了解,它将对我们的后续工作提供有意义的指导。
写在最后
目前,我们正在推进“数据库内核一周一审”的建设,希望博众家之所长,打造一个相互学习,相互敬重的数据库内核研发生态,欢迎更多的朋友们参与到这项工作中来,齐心协力地推动中国数据库内核的发展。
感谢应急管理大学袁国铭主任、中国 PostgreSQL 分会魏波老师、王其达老师、开放原子开源基金会李少辉老师、张凯老师、李永老师、李明康老师原 PikiwiDB 社区于雨老师、Eagle 社区李明宇老师、青学会吴洋老师、IvorySQL 社区任娇老师、牛世继老师、阿里周正中老师、OpenTenBase 社区符芬菊老师、@红发哥、@Makio、@可可、@轩轩、@进化哥、@不语哥、@念念哥、@smx123、@奶啤、@拉黑哥、@奇迹银河狮子狗、@川渝卷王、@Cngal、@默然、@且听风吟的帮助和指导。




