暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis 事务功能简介

文一,应急管理大学在读本科生,中国 PostgreSQL 分会实训基地秘书长,360 基础架构部 pikiwidb.com 设计者,PikiwiDB 与 IvorySQL 代码贡献者,MOP 社区会员,《数据库红皮书》(线上读物)译者,曾经在中国科学院,北京航空航天大学等地展开 PostgreSQL 内核相关的技术分享工作。

事务,是数据库领域中的重中之重,是一种对于 ACID(原子性、一致性、隔离性、持久性)的具体实现方案,在 Redis 中,我们可以通过 MULTI
(发起一个事务)、EXEC
(提交并执行一个事务)、DISCARD
(取消一个事务)、WATCH
(观察某个元素的变化)等指令指导事务模块展开工作,就比如:

MULTI
SET Key_2021 "Hello World 2021"
SET Key_2022 "Hello World 2022"
Get Key_2022
SET Key_2023 "Hello World 2023"
SET Key_2024 "Hello World 2024"
EXEC

具体的执行结果如下:

可以发现:

  1. 在 Redis 事务中的指令,并不会被立刻执行,他们会被放置于一个队列之中,待到 EXEC
    指令发起之后,才会逐条执行(这就和 PostgreSQL 是不一致的,在 PostgreSQL 的基于 MVCC 的事务模型之中,同一份数据会根据时间顺序,即事务 ID 的先后划分出多个不同的版本,并最终根据隔离级别的不同,决定是否呈现给某个事务,也正是因为如此,倘若我们在 PostgreSQL 修改某一个数据项,在隔离级别允许以及其它事务没有冲突性提交的情况下,其结果会立刻呈现出来)

  2. 无论是数据的读取还是数据的写入,其得到的结果都会是 QUEUED
    ,即指令已经被放入队列之中(甚至于只是回显用户输入的 ECHO 指令,也都不能够立即反馈,也都需要排入队列才可以交付执行)

而我们建立了这种了解之后,就可以展开具体的进一步的工作,首先,正如同我们在《Redis 的指令表实现机制简介》(https://datapromoto.atomgit.com/explore/journalism/detail/358934497930121216)所介绍的那样,所有的 Redis 指令,实际上就对应着一个对接的C语言函数,因此,理解事务指令所对应的C语言函数的具体行为,就可以帮助我们,建立对于 Redis 的事务的更为积极的了解。

具体分析:Redis MULTI 指令

根据 Redis 指令的命名规律(指令名称小写配合 Command 后缀,往往就是对于某种指令的C语言对接函数),我们可以非常顺利地找到 multiCommand
这个函数,它就是实现 MULTI
指令的对接函数,其代码参考如下:

/* 代码来源于 src/multi.c */
void multiCommand(client *c) {
   /*
       如果用于抽象化客户端的对象已经拥有事务标记,
       说明已经处于事务状态中,
       报错并直接返回即可
   */
   if (c->flags & CLIENT_MULTI) {
       addReplyError(c,"MULTI calls can not be nested");
       return;
  }
   /* 如果没有事务标记,则加上,并汇报成功 */
   c->flags |= CLIENT_MULTI;

   addReply(c,shared.ok);
}

可以发现,MULTI
指令的核心,便是加上事务标记,除此之外没有展开其它的工作,但是这样子,读者难免疑惑,因为只是通过位运算加上标记,是不可能做到让其它指令在事务状态时,选择进入指令队列,而不是直接交付执行的。

这就说明,我们还需要结合其它方面的逻辑,才可以把这个问题说明清白,因此,我们将把目光聚焦于 processCommand
上面,它是 Redis 中负责处理指令执行的函数,参考下面的内容:

/* 
   代码来源于 src/server.c
   根据实际情况做了修订
*/
int processCommand(client *c) {
   // ...
   /* 判断客户端是否处于事务状态之中,且待执行指令非事务相关 */
   if (c->flags & CLIENT_MULTI &&
       c->cmd->proc != execCommand &&
       c->cmd->proc != discardCommand &&
       c->cmd->proc != multiCommand &&
       c->cmd->proc != watchCommand &&
       c->cmd->proc != quitCommand &&
       c->cmd->proc != resetCommand)
  {
       /* 处于事务状态之中,则将指令放入队列,并通报用户 */
       queueMultiCommand(c, cmd_flags);
       addReply(c,shared.queued);
  } else {
       /* 并非处于事务状态之中,则直接执行指令 */
       int flags = CMD_CALL_FULL;
       if (client_reprocessing_command) flags |= CMD_CALL_REPROCESSING;
       call(c,flags);
       // ...
  }
   // ...
}

而当我们把目光聚焦于 queueMultiCommand
上:

/* 
   来源于 src/multi.c
   负责将指令放置进入到指令队列之中
*/
void queueMultiCommand(client *c, uint64_t cmd_flags) {
   multiCmd *mc;

   /* 当 Watch 指定的数据键或者事务本身出错的时候,直接返回 */
   if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
       return;
   /*
       客户端对象存储着事务队列指令的数量
           在初始情况下,即没有指令,数量为 0 的情况下
           Redis 会将其直接设置为 2
           因为假定要原子化的指令至少有两条
           (这是非常符合逻辑的,因为单条指令本身就是很难见缝插针的)

    */
   if (c->mstate.count == 0) {
       c->mstate.commands = zmalloc(sizeof(multiCmd)*2);
       c->mstate.alloc_count = 2;
  }
   /*
       如果超出容量限制,按照2的倍数进行容量拓展
   */
   if (c->mstate.count == c->mstate.alloc_count) {
       c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
       c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count));
  }
   /*
       最终扫尾的工作,因为队列指令将会被缓存到抽象化的客户端对象之中
       因此这里做的就是更新统计数据
   */
   mc = c->mstate.commands+c->mstate.count;
   mc->cmd = c->cmd;
   mc->argc = c->argc;
   mc->argv = c->argv;
   mc->argv_len = c->argv_len;

   c->mstate.count++;
   c->mstate.cmd_flags |= cmd_flags;
   c->mstate.cmd_inv_flags |= ~cmd_flags;
   c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;

   /*
       清理客户端传入的 argc, argv
       因为已经存储进入 mstate,即事务队列之中
    */
   c->argv = NULL;
   c->argc = 0;
   c->argv_len_sum = 0;
   c->argv_len = 0;
}

就可以发现,Redis 的事务实现,实际上非常清晰,就是客户端在服务端做一个登记,表示自身进入事务处理状态,随后服务端方面就开始对后续传入的指令展开进一步的记录工作。

这就引出了我们对于 EXEC
指令的分析工作,参考下面的内容。

具体分析 Redis EXEC 指令

/* src/multi.c */
void execCommand(client *c) {
   int j;
   robj **orig_argv;
   int orig_argc, orig_argv_len;
   struct redisCommand *orig_cmd;

   /* 客户端并没有处于事务状态下,返回 */
   if (!(c->flags & CLIENT_MULTI)) {
       addReplyError(c,"EXEC without MULTI");
       return;
  }

   /* 监视的数据键过期,状态变化,加上相关状态,后续停止事务执行 */
   if (isWatchedKeyExpired(c)) {
       c->flags |= (CLIENT_DIRTY_CAS);
  }

   /* 在 Watch 的数据键发生改变,事务内指令出现错误时,不执行事务 */
   if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
       if (c->flags & CLIENT_DIRTY_EXEC) {
           addReplyErrorObject(c, shared.execaborterr);
      } else {
           addReply(c, shared.nullarray[c->resp]);
      }

       discardTransaction(c);
       return;
  }

   uint64_t old_flags = c->flags;

   /*
       Redis 希望事务是短平快的,因此不会允许长期阻塞执行的指令存在,
       这点和 PostgreSQL 是很不一样的
    */
   c->flags |= CLIENT_DENY_BLOCKING;

   /* 取消客户端所有与 Watch 相关的数据键的监视状态 */
   unwatchAllKeys(c);

   /* 进行标记,代表 Redis 服务端在做执行工作 */
   server.in_exec = 1;

   orig_argv = c->argv;
   orig_argv_len = c->argv_len;
   orig_argc = c->argc;
   orig_cmd = c->cmd;

   /* 根据事务队列中存储的指令数量,初步构建反馈结果 */
   addReplyArrayLen(c,c->mstate.count);
   /* 逐条遍历指令 */
   for (j = 0; j < c->mstate.count; j++) {
       // ...

       /* 检测权限,因为 ACL 目前不在我们的关注范围内,故不讨论 */
       int acl_errpos;
       int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
       if (acl_retval != ACL_OK) {
           /* 不具有相关权限,则不执行 */
           // ...
      } else {
           /* 不具有相关权限,则执行之 */
           if (c->id == CLIENT_ID_AOF) {
               /*
                   为 AOF 日志而准备,则不向子节点传输日志,
                   也不记录 SlowLog 和 内存使用状态
               */
               call(c,CMD_CALL_NONE);
          }
           else {
               /* 按照一般指令办法对待之 */
               call(c,CMD_CALL_FULL);
          }

           // ...
      }

       // ...
  }

   /*
       倘若旧有标志位并不阻挡长执行指令,则不阻挡,因为事务结束
       又可以处理长周期指令了
   */
   if (!(old_flags & CLIENT_DENY_BLOCKING))
       c->flags &= ~CLIENT_DENY_BLOCKING;

   /* 扫尾工作,宣告客户端和服务端的事务状态结束 */
   c->argv = orig_argv;
   c->argv_len = orig_argv_len;
   c->argc = orig_argc;
   c->cmd = c->realcmd = orig_cmd;
   discardTransaction(c);

   server.in_exec = 0;
}

具体分析 Redis WATCH 指令

这样,我们也就对 Redis 的事务实现建立了一个基本的了解,但是仅仅是这两条指令还是不够的,因为我们还并不清楚什么是 Watch
指令,没有理解 Redis 是如何监测数据项的变化的,因此,请让我们将目光聚焦于 watchCommand
之上,参考下面的代码:

void watchCommand(client *c) {
   int j;

   /* Watch 只能够在事务之外做执行 */
   if (c->flags & CLIENT_MULTI) {
       addReplyError(c,"WATCH inside MULTI is not allowed");
       return;
  }
   /*
       如果已经有监测的数据键已经被修改,其它的随之无意义,
       直接反馈 OK
   */
   if (c->flags & CLIENT_DIRTY_CAS) {
       addReply(c,shared.ok);
       return;
  }
   /* 在服务端登记要监视的数据键,再反馈 OK */
   for (j = 1; j < c->argc; j++)
       watchForKey(c,c->argv[j]);
   addReply(c,shared.ok);
}

之后,我们继续深入阅读 watchForKey
函数:

void watchForKey(client *c, robj *key) {
   list *clients = NULL;
   listIter li;
   listNode *ln;
   watchedKey *wk;

   /* 服务端需要掌握需要监视数据键的客户端的数量 */
   if (listLength(c->watched_keys) == 0) server.watching_clients++;

   /*
       如果已经监视,则返回
       客户端需要监测的数据键存储于客户端对象自身上
       此处做的本质上就是列表遍历工作
   */
   listRewind(c->watched_keys,&li);
   while((ln = listNext(&li))) {
       wk = listNodeValue(ln);
       if (wk->db == c->db && equalStringObjects(key,wk->key))
           return;
  }
   /*
       Redis 按照 Database(命名空间)-Dict(动态哈希表)
       的方式组织数据与管理数据
       此处展开的就是登记工作
   */
   clients = dictFetchValue(c->db->watched_keys,key);
   if (!clients) {
       clients = listCreate();
       dictAdd(c->db->watched_keys,key,clients);
       incrRefCount(key);
  }
   /* wk 用于代指被监视的数据键 */
   wk = zmalloc(sizeof(*wk));
   wk->key = key;
   wk->client = c;
   wk->db = c->db;
   wk->expired = keyIsExpired(c->db, key);
   /* 增添引用计数,进入监视列表 */
   incrRefCount(key);
   listAddNodeTail(c->watched_keys, wk);
   watchedKeyLinkToClients(clients, wk);
}

而被监视的数据键,检测其是否改变,对应的函数即为 signalModifiedKey
,它在 Redis 将会调用得到的地方很多,而它的策略也非常简单,只要将要修订的键是被监测的键,那么就算做修订行为发生。

因此,即使我们将数据设置的一模一样,也没有任何的意义,如下所示:

(事务照样执行失败)

由此,我们也就通过三个基本的指令,对于 Redis 的轻量级事务模型建立了了解,它将对我们的后续工作提供有意义的指导。

写在最后

目前,我们正在推进“数据库内核一周一审”的建设,希望博众家之所长,打造一个相互学习,相互敬重的数据库内核研发生态,欢迎更多的朋友们参与到这项工作中来,齐心协力地推动中国数据库内核的发展。

感谢应急管理大学袁国铭主任、中国 PostgreSQL 分会魏波老师、王其达老师、开放原子开源基金会李少辉老师、张凯老师、李永老师、李明康老师原 PikiwiDB 社区于雨老师、Eagle 社区李明宇老师、青学会吴洋老师、IvorySQL 社区任娇老师、牛世继老师、阿里周正中老师、OpenTenBase 社区符芬菊老师、@红发哥、@Makio、@可可、@轩轩、@进化哥、@不语哥、@念念哥、@smx123、@奶啤、@拉黑哥、@奇迹银河狮子狗、@川渝卷王、@Cngal、@默然、@且听风吟的帮助和指导。



文章转载自青年数据库学习互助会,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论