暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

openGauss数据库源码解析系列文章—安全管理源码解析(三)

openGauss 2023-08-07
355

五、审计与追踪

审计机制和审计追踪机制能够对用户的日常行为进行记录和分析,实现规避风险、提高安全性。

5.1  审计日志设计

审计内容的记录方式通常有两种:记录到数据库的表中、记录到OS文件中。openGauss采用记录到OS文件中(即审计日志)的方式来保存审计结果,审计日志文件夹受操作系统权限保护,默认只有初始化用户可以读写,从数据库安全角度出发,保证了审计结果的可靠性。日志文件的存储目录由audit_directory参数指定。

openGauss审计日志每条记录包括time、type、result、userid、username、database、client_conninfo、object_name、detail_info、node_name、thread_id、local_port、remote_port共13个字段。图23为审计日志的单条记录示例。

图23  审计记录示例

对审计日志文件进行读写的函数的代码主要位于“pgaudit.cpp”文件中,其中主要包括两类函数:审计文件的读、写、更新函数;审计记录的增、删、查接口。

首先介绍审计文件的数据结构,如图24所示。

openGauss的审计日志采用文件的方式存储在指定目录中。通过查看目录,可以发现日志主要包括两类文件:形如0_adt的审计文件以及名为index_table索引文件。

图24  审计文件结构

以adt结尾的审计文件中,每一条审计记录对应一个AuditData结构体。数据结构AuditData代码如下:

    typedef struct AuditData {
       AuditMsgHdr header;    /*  记录文件头,存储记录的标识、大小等信息   */
       AuditType type;        /*  审计类型   */
       AuditResult result;      /*  执行结果   */
       char varstr[1];         *  二进制格式存储的具体审计信息   */
    } AuditData;

    其中AuditMsgHdr记录着审计记录的标识信息,数据结构AuditMsgHdr的代码如下:

      typedef struct AuditMsgHdr {
         char signature[2];   *  审计记录标识,目前固定为AUDIT前两个字符’A’和’U’  */
         uint16 version;      /*  版本信息,目前固定为0   */
         uint16 fields;       *  审计记录字段数,目前为13   */
         uint16 flags;        /*  记录有效性标识,如果被删除则标记为DEAD   */
         pg_time_t time;     *  审计记录创建时间   */
         uint32 size;         *  审计信息占字节长度   */
      } AuditMsgHdr;

      AuditData的其他结构存储着审计记录的审计信息,AuditType为审计类型,目前有38种类型。AuditResult为执行的结果,有AUDIT_UNKNOWN、AUDIT_OK、AUDIT_FAILED三种结果。其余的各项信息,均通过二进制的方式写入到varstr中。

      审计日志有关的另一个文件为索引文件index_table,其中记录着审计文件的数量、审计日志文件编号、审计文件修改日期等信息。其数据结构AuditIndexTable代码如下:

        typedef struct AuditIndexTable {
           uint32 maxnum;             *  审计目录下审计文件个数的最大值   */
           uint32 begidx;               *  审计文件开始编号   */
           uint32 curidx;                /*  当前使用的审计文件编号   */
           uint32 count;                 *  当前审计文件的总数   */
           pg_time_t last_audit_time;      /*  最后一次写入审计记录的时间   */
           AuditIndexItem data[1];        /*  审计文件指针   */
        } AuditIndexTable;

        索引文件中每一个AuditIndexItem对应一个审计文件,其数据结构AuditIndexItem的代码如下:

          typedef struct AuditIndexItem {
             pg_time_t ctime;             *  审计文件创建时间   */
             uint32 filenum;              /*  审计文件编号   */
             uint32 filesize;               *  审计文件占空间大小   */
          } AuditIndexItem;

          审计文件的读、写类函数如auditfile_open、auditfile_rotate等函数实现较简单,读者可以直接阅读源码。

          下面主要介绍日志文件的结构和日志记录的增、删、查接口。

          审计记录的写入接口为audit_report函数。该函数的原型为:

            void audit_report(AuditType type, AuditResult result, const char* object_name, const char* detail_info);

            其中入参type、result、object_name、detail_info分别对应审计日志记录中的相应字段,审计日志中的其余9个字段均为函数在执行时从全局变量中获取。

            audit_report函数的执行主要分为3个部分,首先会检查审计的各项开关,判断是否需要审计该操作;然后根据传入的参数、全局变量中的参数以及当前时间,生成审计日志所需的信息并拼接成字符串;最后调用审计日志文件读写接口,将审计日志写入文件中。

            审计记录查询接口为pg_query_audit函数,该函数为数据库内置函数,可供用户直接调用,调用形式为:

              SELECT * FROM pg_query_audit (timestamptz startime,timestamptz endtime, audit_log);

              入参为需要查询审计记录的起始时间和终止时间以及审计日志文件所在的物理路径。当不指定audit_log时,默认查看连接当前实例的审计日志信息。

              审计记录的删除接口为pg_delete_audit函数,该函数为数据库内置函数,可供用户直接调用,调用形式为:

                SELECT * FROM pg_delete_audit (timestamptz startime,timestamptz endtime);

                入参为需要被删除审计记录的起始时间和终止时间。该函数通过调用pgaudit_delete_file函数来将审计日志文件中,startime与endtime之间的审计记录标记为AUDIT_TUPLE_DEAD,达到删除审计日志的效果,而不实际删除审计记录的物理数据。带来的效果是执行该函数审计日志文件大小不会减小。

                5.2  审计执行

                1. 执行原理

                审计机制是openGauss的内置安全能力之一,openGauss提供对用户发起的SQL行为审计和追踪能力,支持针对DDL、DML语句和关键行为(登录、退出、系统启动、恢复)的审计。在每个工作线程初始化阶段把审计模块加载至线程中,其审计的执行原理是把审计函数赋给SQL生命周期不同阶段的Hook(钩子),当线程执行至SQL处理流程的特定阶段后会进行审计执行判定逻辑。审计模块加载关键代码如下:

                  void pgaudit_agent_init(void) {

                     /* DDL、DML语句审计Hook赋值, 赋值结束后标识审计模块已在此线程加载 */
                     prev_ExecutorEnd = ExecutorEnd_hook;
                     ExecutorEnd_hook = pgaudit_ExecutorEnd;
                     prev_ProcessUtility = ProcessUtility_hook;
                     ProcessUtility_hook = (ProcessUtility_hook_type)pgaudit_ProcessUtility;
                     u_sess->exec_cxt.g_pgaudit_agent_attached = true;
                  }

                  SQL语句在执行到ProcessUtility_hook和ExecutorEnd_hook函数指针时,会分别进入到已预置好的审计流程中。这两个函数指针的位置在SQL进入执行器执行之前,具体关系如图25所示。

                  图25  审计执行关系图

                  如图25所示,在线程初始化阶段审计模块已加载完毕。SQL经过优化器得到计划树,此时审计模块的pgaudit_ExecutorEnd函数和pgaudit_ProcessUtility函数分别进行DML和DDL语句的分析,如果和已设置审计策略相匹配,则会调用审计日志接口,生成对应的审计日志。对于系统变更类的审计直接内置于相应行为的内核代码中。

                  2. 关键执行流程

                  1) 系统变更类审计执行

                    pgaudit_system_recovery_ok
                    pgaudit_system_start_ok
                    pgaudit_system_stop_ok
                    pgaudit_user_login
                    pgaudit_user_logout
                    pgaudit_system_switchover_ok
                    pgaudit_user_no_privileges
                    pgaudit_lock_or_unlock_user

                    以上为openGauss支持系统变更类的审计执行函数,对于此类审计函数均嵌入内核相应调用流程中,下面以审计用户登入退出pgaudit_user_login函数为例说明其主体流程。

                    图26  登入审计执行流程

                    图26为服务端校验客户端登入时的主要流程。以登录失败场景为例,首先根据配置文件和客户端IP和用户信息确认采用的认证方式(包括sha256和SSL认证等);然后根据不同的认证方式采用不同的认证流程和客户端进行交互完成认证身份流程;如果认证失败,则线程进入退出流程上报客户端,此时调用pgaudit_user_login获取当前访问数据库名称和详细信息,并记录登录失败相关的审计日志。关键代码如下:

                      /* 拼接登录失败时候的详细信息,包括数据库名称和用户名 */
                      rc = snprintf_s(details,
                      PGAUDIT_MAXLENGTH,
                         PGAUDIT_MAXLENGTH - 1,
                         "login db(%s)failed,authentication for user(%s)failed",
                         port->database_name,
                         port->user_name);
                      securec_check_ss(rc, "\0", "\0");
                      /* 调用登入审计函数,记录审计日志 */
                      pgaudit_user_login(FALSE, port->database_name, details);
                      /* 退出当前线程 */
                      ereport(FATAL, (errcode(errcode_return), errmsg(errstr, port->user_name)))

                      登入审计日志接口pgaudit_user_login则主要完成审计日志记录接口需要参数的拼接,相关代码如下:

                        void pgaudit_user_login(bool login_ok, const char* object_name, const char* detaisinfo)
                        {
                           AuditType audit_type;
                           AuditResult audit_result;
                           Assert(detaisinfo);
                           /* 审计类型和审计结果拼装 */
                           if (login_ok) {
                               audit_type = AUDIT_LOGIN_SUCCESS;
                               audit_result = AUDIT_OK;
                           } else {
                               audit_type = AUDIT_LOGIN_FAILED;
                               audit_result = AUDIT_FAILED;
                           }
                           /* 直接调用审计日志记录接口 */
                           audit_report(audit_type, audit_result, object_name, detaisinfo);
                        }

                        2) DDL、DML语句审计执行

                        依据“1. 执行原理”节的描述,DDL、DML语句的执行分别由于pgaudit_ProcessUtility函数、pgaudit_ExecutorEnd函数来承载。此处首先介绍函数pgaudit_ProcessUtility函数,其主体结构代码如下:

                          static void pgaudit_ProcessUtility(Node* parsetree, const char* queryString, ...)
                          {
                          /*  适配不同编译选项  */
                          ...
                          /*  开始匹配不同的DDL语句  */
                          switch (nodeTag(parsetree)) {
                             case T_CreateStmt: {
                                 /* CREATE table语句审计执行 */
                                 CreateStmt* createtablestmt = (CreateStmt*)(parsetree);
                                 pgaudit_ddl_table(createtablestmt->relation->relname, queryString);
                             } break;
                             case T_AlterTableStmt: {
                                 AlterTableStmt* altertablestmt = (AlterTableStmt*)(parsetree); * Audit alter table */
                                 if (altertablestmt->relkind == OBJECT_SEQUENCE) {
                                     pgaudit_ddl_sequence(altertablestmt->relation->relname, queryString);
                                 } else {
                                     pgaudit_ddl_table(altertablestmt->relation->relname, queryString);
                                 }
                             } break;
                             /*  匹配其他DDL类型语句逻辑  */
                             ...
                          }}

                          DDL审计执行函数关键入参parsetree用于识别审计日志类型(create/drop/alter等操作)。入参queryString保存原始执行SQL语句,用于记录审计日志,略去非关键流程。此函数主要根据判断nodeTag所归属的DDL操作类型,进入不同的审计执行逻辑。以T_CreateStmt为例,识别当前语句CREATE table则进入pgaudit_ddl_table逻辑进行审计日志执行并最终记录审计日志。

                          图27  DDL审计执行流程

                          如图27所示,首先从当前SQL语句中获取执行对象类别校验其相应的审计开关是否开启(可以通过GUC参数audit_system_object控制)。当前支持开启的全量对象代码如下:

                            typedef enum {
                            DDL_DATABASE = 0,
                            DDL_SCHEMA,
                            DDL_USER,
                            DDL_TABLE,
                            DDL_INDEX,
                            DDL_VIEW,
                            DDL_TRIGGER,
                            DDL_FUNCTION,
                            DDL_TABLESPACE,
                            DDL_RESOURCEPOOL,
                            DDL_WORKLOAD,
                            DDL_SERVERFORHADOOP,
                            DDL_DATASOURCE,
                            DDL_NODEGROUP,
                            DDL_ROWLEVELSECURITY,
                            DDL_TYPE,
                            DDL_TEXTSEARCH,
                            DDL_DIRECTORY,
                            DDL_SYNONYM
                            } DDLType;

                            如果DDL操作的对象审计已开启则进行审计日志记录流程,在调用审计日志记录函数audit_report之前需要对包含密码的SQL语句进行脱敏处理。将包含密码的语句中(CREATE role/user)密码替换成‘********’用于隐藏敏感信息,至此针对CREATE DDL语句的审计执行完成。其他类型DDL语句主体流程一致,不做赘述。

                            下面介绍针对DML语句审计执行逻辑pgaudit_ExecutorEnd函数,整体调用流程如图28所示。

                            图28  DML审计执行流程

                            首先判断SQL查询语句所归属的查询类型。以CMD_SELECT类型为例,先获取查询对象的object_name用于审计日志记录中访问对象的记录,然后调用pgaudit_dml_table函数。相关代码如下:

                              case CMD_SELECT:
                              object_name = pgaudit_get_relation_name(queryDesc->estate->es_range_table);
                              pgaudit_dml_table_select(object_name, queryDesc->sourceText);

                              和DDL的记录一样,同样会对敏感信息进行脱敏后调用审计日志记录接口audit_report,至此对DML语句的审计日志执行完成。

                              六、数据安全技术

                              openGauss采用了多种加密解密技术来提升数据在各个环节的安全性。

                              6.1  数据加解密接口

                              用户在使用数据库时,除了需要基本的数据库安全之外,还会对导入的数据进行加密和解密的操作。openGauss提供了针对用户导入数据进行加密和解密的功能接口,用户使用该接口可以对其认为包含敏感信息的数据进行加密和解密操作。

                              1. 数据加密接口

                              openGauss提供的加密功能是基于标准的AES128加密算法进行实现,提供的加密接口函数为:

                                gs_encrypt_aes128 (encryptstr, keystr)

                                其中keystr是用户提供的密钥明文,加密函数通过标准的AES128加密算法对encryptstr字符串进行加密,并返回加密后的字符串。keystr的长度范围为1~16字节。加密函数支持的加密数据类型包括数值类型、字符类型、二进制类型中的RAW、日期/时间类型中的DATE、TIMESTAMP、SMALLDATETIME等。

                                加密函数返回的的密文值长度:至少为92字节,不超过4*[(Len+68)/3]字节,其中Len为加密前数据长度(单位为字节)。

                                使用示例如下:

                                  opengauss=# CREATE table student005 (name text);
                                  opengauss=# INSERT into student005 values(gs_encrypt_aes128('zhangsan','gaussDB123'));
                                  INSERT 0 1
                                  opengauss=# SELECT * FROM student005;
                                                                              name
                                  ----------------------------------------------------------------------------------------------
                                  NrGJdx8pDgvUSE2NN7eM5mFDnSSJ41fq31/0SI2+4kABgOnCu9H2vkjpvcAdG/AhJ8OrBn906Xaj6oqyEHsTbcTvjrU=
                                  (1 row)

                                  加密接口函数是通过函数gs_encrypt_aes128实现的,其代码源文件为:“builtins.h”和“cipherfn.cpp”。

                                  该函数是一个openGauss的存储过程函数,通过用户输入的明文和密钥进行数据的加密操作。

                                  主要流程如图29所示。

                                  图29  数据加密流程

                                  数据加密的代码如下逐个部分介绍。

                                  开始将明文转换为密文过程,相关代码如下:

                                    bool gs_encrypt_aes_speed (GS_UCHAR* plaintext, GS_UCHAR* key, GS_UCHAR* ciphertext, GS_UINT32* cipherlen)
                                    ……

                                    获取随机salt值,获取派生密钥,相关代码如下:

                                      /* bool gs_encrypt_aes_speed函数:  */
                                      /* 使用存在的随机salt值  */
                                         static THR_LOCAL GS_UCHAR random_salt_saved[RANDOM_LEN] = {0};
                                         static THR_LOCAL bool random_salt_tag = false;
                                         static THR_LOCAL GS_UINT64 random_salt_count = 0;


                                         /* 对随机salt值的使用次数限制  */
                                         const GS_UINT64 random_salt_count_max = 24000000;


                                         if (random_salt_tag == false || random_salt_count > random_salt_count_max) {
                                             /* 加密获取随机salt值  */
                                             retval = RAND_bytes(init_rand, RANDOM_LEN);
                                             if (retval != 1) {
                                                 (void)fprintf(stderr, _("generate random key failed,errcode:%u\n"), retval);
                                                 return false;
                                             }
                                             random_salt_tag = true;
                                             errorno = memcpy_s(random_salt_saved, RANDOM_LEN, init_rand, RANDOM_LEN);
                                             securec_check(errorno, "\0", "\0");
                                             random_salt_count = 0;
                                         } else {
                                             errorno = memcpy_s(init_rand, RANDOM_LEN, random_salt_saved, RANDOM_LEN);
                                             securec_check(errorno, "\0", "\0");
                                             random_salt_count++;
                                         }


                                         plainlen = strlen((const char*)plaintext);

                                      存储用户用户密钥和派生密钥以及salt值。相关代码如下:

                                        bool aes128EncryptSpeed(GS_UCHAR* PlainText, GS_UINT32 PlainLen, GS_UCHAR* Key, GS_UCHAR* RandSalt,
                                        GS_UCHAR* CipherText, GS_UINT32* CipherLen)
                                        {
                                        ……
                                        /* 如果随机salt和key没有更新就使用已经存在的派生key,否则就生成新的派生key ’  */


                                           if (0 == memcmp(RandSalt, random_salt_saved, RANDOM_LEN)) {
                                               retval = 1;
                                               /* 掩码保存用户key和派生key  */
                                               for (GS_UINT32 i = 0; i < RANDOM_LEN; ++i) {
                                                   if (user_key[i] == ((char)input_saved[i] ^ (char)random_salt_saved[i])) {
                                                       derive_key[i] = ((char)derive_vector_saved[i] ^ (char)random_salt_saved[i]);
                                                       mac_key[i] = ((char)mac_vector_saved[i] ^ (char)random_salt_saved[i]);
                                                   } else {
                                                       retval = 0;
                                                   }
                                               }
                                           }


                                           if (!retval) {
                                               retval = PKCS5_PBKDF2_HMAC(
                                                   (char*)Key, keylen, RandSalt, RANDOM_LEN, ITERATE_TIMES, (EVP_MD*)EVP_sha256(), RANDOM_LEN, derive_key);
                                               if (!retval) {
                                                   (void)fprintf(stderr, _("generate the derived key failed,errcode:%u\n"), retval);
                                                   ……
                                                   return false;
                                               }


                                               /* 为hmac生成mac key  */
                                               retval = PKCS5_PBKDF2_HMAC((char*)user_key,
                                                   RANDOM_LEN,
                                                   RandSalt,
                                                   RANDOM_LEN,
                                                   MAC_ITERATE_TIMES,
                                                   (EVP_MD*)EVP_sha256(),
                                                   RANDOM_LEN,
                                                   mac_key);
                                               if (!retval) {
                                                   (void)fprintf(stderr, _("generate the mac key failed,errcode:%u\n"), retval);
                                                   ……
                                                   return false;
                                               }


                                               /* 存储随机salt  */
                                               errorno = memcpy_s(random_salt_saved, RANDOM_LEN, RandSalt, RANDOM_LEN);
                                               securec_check_c(errorno, "\0", "\0");


                                               /* 使用随机salt为存储的user key、派生key和mac key做掩码处理  */
                                               for (GS_UINT32 i = 0; i < RANDOM_LEN; ++i) {
                                                   input_saved[i] = ((char)user_key[i] ^ (char)random_salt_saved[i]);
                                                   derive_vector_saved[i] = ((char)derive_key[i] ^ (char)random_salt_saved[i]);
                                                   mac_vector_saved[i] = ((char)mac_key[i] ^ (char)random_salt_saved[i]);
                                               }
                                        }
                                        }

                                        使用派生密钥去加密明文。相关代码如下:

                                          GS_UINT32 CRYPT_encrypt(GS_UINT32 ulAlgId, const GS_UCHAR* pucKey, GS_UINT32 ulKeyLen, const GS_UCHAR* pucIV,
                                          GS_UINT32 ulIVLen, GS_UCHAR* pucPlainText, GS_UINT32 ulPlainLen, GS_UCHAR* pucCipherText, GS_UINT32* pulCLen)
                                          ……
                                             cipher = get_evp_cipher_by_id(ulAlgId);
                                             if (cipher == NULL) {
                                                 (void)fprintf(stderr, ("invalid ulAlgType for cipher,please check it!\n"));
                                                 return 1;
                                             }
                                             ctx = EVP_CIPHER_CTX_new();
                                             if (ctx == NULL) {
                                                 (void)fprintf(stderr, ("ERROR in EVP_CIPHER_CTX_new:\n"));
                                                 return 1;
                                             }
                                             EVP_CipherInit_ex(ctx, cipher, NULL, pucKey, pucIV, 1);


                                             /* 开启填充模式  */
                                             EVP_CIPHER_CTX_set_padding(ctx, 1);
                                             /* 处理最后一个block  */
                                             blocksize = EVP_CIPHER_CTX_block_size(ctx);
                                             if (blocksize == 0) {
                                                 (void)fprintf(stderr, ("invalid blocksize, ERROR in EVP_CIPHER_CTX_block_size\n"));
                                                 return 1;
                                          }


                                             nInbufferLen = ulPlainLen % blocksize;
                                             padding_size = blocksize - nInbufferLen;
                                             pchInbuffer = (unsigned char*)OPENSSL_malloc(blocksize);
                                             if (pchInbuffer == NULL) {
                                                 (void)fprintf(stderr, _("malloc failed\n"));
                                                 return 1;
                                             }
                                             /* 第一个字节使用“0x80”去填充,其他的使用“0x00”填充  */
                                             rc = memcpy_s(pchInbuffer, blocksize, pucPlainText + (ulPlainLen - nInbufferLen), nInbufferLen);
                                             securec_check_c(rc, "\0", "\0");
                                             rc = memset_s(pchInbuffer + nInbufferLen, padding_size, 0, padding_size);
                                             securec_check_c(rc, "\0", "\0");
                                             pchInbuffer[nInbufferLen] = 0x80;


                                             EVP_CIPHER_CTX_set_padding(ctx, 0);

                                          将加密信息加入密文头方便解密,并转换加密信息为可见的脱敏模式encode。相关代码如下:

                                            /*将init rand添加到密文的头部进行解密使用 */
                                               GS_UCHAR mac_temp[MAC_LEN] = {0};
                                               errorno = memcpy_s(mac_temp, MAC_LEN, ciphertext + *cipherlen - MAC_LEN, MAC_LEN);
                                               securec_check(errorno, "\0", "\0");
                                               errorno = memcpy_s(ciphertext + *cipherlen - MAC_LEN + RANDOM_LEN, MAC_LEN, mac_temp, MAC_LEN);
                                               securec_check(errorno, "\0", "\0");


                                               GS_UCHAR temp[RANDOM_LEN] = {0};
                                               for (GS_UINT32 i = (*cipherlen - MAC_LEN) RANDOM_LEN; i >= 1; --i) {
                                                   errorno = memcpy_s(temp, RANDOM_LEN, ciphertext + (i - 1) * RANDOM_LEN, RANDOM_LEN);
                                                   securec_check(errorno, "\0", "\0");


                                                   errorno = memcpy_s(ciphertext + i * RANDOM_LEN, RANDOM_LEN, temp, RANDOM_LEN);
                                                   securec_check(errorno, "\0", "\0");
                                               }
                                               errorno = memcpy_s(ciphertext, RANDOM_LEN, init_rand, RANDOM_LEN);
                                               securec_check(errorno, "\0", "\0");
                                               *cipherlen = *cipherlen + RANDOM_LEN;
                                               errorno = memset_s(temp, RANDOM_LEN, '\0', RANDOM_LEN);
                                            securec_check(errorno, "\0", "\0");
                                            ……
                                            /*对密文进行编码,以实现良好的显示和解密操作*/
                                               encodetext = SEC_encodeBase64((char*)ciphertext, ciphertextlen);

                                            至此完成加密过程。

                                            2. 数据解密接口

                                            openGauss提供的解密接口函数为:

                                              gs_decrypt_aes128 (decryptstr, keystr)

                                              以keystr为用户加密密钥对decryptstr加密字符串进行解密,返回解密后的字符串。解密使用的keystr必须保证与加密时使用的keystr一致才能正常解密。keystr不得为空。

                                              使用示例如下。

                                                opengauss=# SELECT gs_decrypt_aes128(name,'gaussDB123') FROM student005;
                                                gs_decrypt_aes128
                                                -------------------
                                                zhangsan
                                                (1 row)

                                                解密接口函数是通过函数gs_decrypt_aes128实现的,其代码源文件为:“builtins.h”和“cipherfn.cpp”。

                                                该函数是一个openGauss的存储过程函数,通过用户输入的密文(注明文加密生成的密文)和密钥进行数据的解密操作。

                                                主要流程如图30所示。

                                                图30  数据解密流程

                                                数据解密的代码如下逐个部分介绍。

                                                通过存储过程的入参解析出需要解密的密文和密钥,并进行脱敏的decode操作。相关代码如下:

                                                  decodetext = (GS_UCHAR*)(text_to_cstring(PG_GETARG_TEXT_P(0)));
                                                     key = (GS_UCHAR*)(text_to_cstring(PG_GETARG_TEXT_P(1)));
                                                     keylen = strlen((const char*)key);


                                                     /*为解密操作去做密文解码 */
                                                     ciphertext = (GS_UCHAR*)(SEC_decodeBase64((char*)decodetext, &decodetextlen));
                                                     if ((ciphertext == NULL) || (decodetextlen <= RANDOM_LEN)) {
                                                         if (ciphertext != NULL) {
                                                             OPENSSL_free(ciphertext);
                                                             ciphertext = NULL;
                                                         }
                                                         errorno = memset_s(decodetext, decodetextlen, '\0', decodetextlen);
                                                         securec_check(errorno, "\0", "\0");
                                                         pfree_ext(decodetext);
                                                         errorno = memset_s(key, keylen, '\0', keylen);
                                                         securec_check(errorno, "\0", "\0");
                                                         pfree_ext(key);
                                                         ereport(ERROR,
                                                             (errcode(ERRCODE_EXTERNAL_ROUTINE_INVOCATION_EXCEPTION),
                                                                 errmsg("Decode the cipher text failed or the ciphertext is too short!")));
                                                     }
                                                     errorno = memset_s(decodetext, decodetextlen, '\0', decodetextlen);
                                                     securec_check(errorno, "\0", "\0");
                                                     pfree_ext(decodetext);


                                                     plaintext = (GS_UCHAR*)palloc(decodetextlen);
                                                     errorno = memset_s(plaintext, decodetextlen, '\0', decodetextlen);
                                                     securec_check(errorno, "\0", "\0");

                                                  开始将密文转换为明文过程。相关代码如下:

                                                    bool gs_decrypt_aes_speed(
                                                    GS_UCHAR* ciphertext, GS_UINT32 cipherlen, GS_UCHAR* key, GS_UCHAR* plaintext, GS_UINT32* plainlen)
                                                    ……

                                                    将密文进行解码操作,分离密文和信息两个部分。相关代码如下:

                                                      bool aes128DecryptSpeed(GS_UCHAR* CipherText, GS_UINT32 CipherLen, GS_UCHAR* Key, GS_UCHAR* RandSalt,
                                                      GS_UCHAR* PlainText, GS_UINT32* PlainLen)
                                                      ……
                                                         /*将密文分成,密文部分、AES向量部分和mac向量部分进行解密操作*/
                                                         GS_UINT32 cipherpartlen = CipherLen - RANDOM_LEN - MAC_LEN;
                                                         errorno = memcpy_s(aes_vector, RANDOM_LEN, CipherText + cipherpartlen, RANDOM_LEN);
                                                         securec_check_c(errorno, "", "");
                                                         errorno = memcpy_s(mac_text_saved, MAC_LEN, CipherText + cipherpartlen + RANDOM_LEN, MAC_LEN);
                                                         securec_check_c(errorno, "", "");


                                                         static THR_LOCAL GS_UINT32 usage_frequency[NUMBER_OF_SAVED_DERIVEKEYS] = {0};


                                                      /* insert_position是用来分隔两个不同的区域的usage_frequency */  
                                                      static THR_LOCAL GS_UINT32 insert_position = NUMBER_OF_SAVED_DERIVEKEYS 2;


                                                         /* 初始化usage_frequency */
                                                         if (usage_frequency[0] == 0 && usage_frequency[NUMBER_OF_SAVED_DERIVEKEYS - 1] == 0) {
                                                             for (GS_UINT32 i = 0; i < NUMBER_OF_SAVED_DERIVEKEYS; ++i)
                                                                 usage_frequency[i] = i;
                                                         }


                                                         errorno = memcpy_s(user_key, RANDOM_LEN, Key, keylen);
                                                         securec_check_c(errorno, "\0", "\0");
                                                         if (keylen < RANDOM_LEN) {
                                                             errorno = memset_s(user_key + keylen, RANDOM_LEN - keylen, '\0', RANDOM_LEN - keylen);
                                                             securec_check_c(errorno, "\0", "\0");
                                                         }


                                                      /*按照使用频率进行顺序查找对应的派生向量*/
                                                         for (GS_UINT32 i = 0; i < NUMBER_OF_SAVED_DERIVEKEYS && !DERIVEKEY_FOUND; ++i) {
                                                             if (0 == memcmp(random_salt_used[usage_frequency[i]], RandSalt, RANDOM_LEN)) {
                                                                 DERIVEKEY_FOUND = 1;
                                                                 for (GS_UINT32 j = 0; j < RANDOM_LEN; ++j) {
                                                                     GS_UCHAR mask = (char)random_salt_used[usage_frequency[i]][j];
                                                                     if (user_key[j] == ((char)user_input_used[usage_frequency[i]][j] ^ (char)mask)) {
                                                                         decrypt_key[j] = ((char)derive_vector_used[usage_frequency[i]][j] ^ (char)mask);
                                                                         mac_key[j] = ((char)mac_vector_used[usage_frequency[i]][j] ^ (char)mask);
                                                                     } else {
                                                                         DERIVEKEY_FOUND = 0;
                                                                     }
                                                                 }
                                                                 if (i > 0 && i < NUMBER_OF_SAVED_DERIVEKEYS / 2 && DERIVEKEY_FOUND) {
                                                                     GS_UINT32 temp = usage_frequency[i - 1];
                                                                     usage_frequency[i - 1] = usage_frequency[i];
                                                                     usage_frequency[i] = temp;
                                                                 } else if (i >= NUMBER_OF_SAVED_DERIVEKEYS / 2 && DERIVEKEY_FOUND) {
                                                                   
                                                                     GS_UINT32 temp = usage_frequency[NUMBER_OF_SAVED_DERIVEKEYS / 2 - 1];
                                                                     usage_frequency[NUMBER_OF_SAVED_DERIVEKEYS / 2 - 1] = usage_frequency[i];
                                                                     usage_frequency[i] = temp;
                                                                 } else {
                                                                     ;
                                                                 }
                                                             }
                                                         }


                                                         /* 如果没有派生向量存在,就生成新的派生key */
                                                         if (!DERIVEKEY_FOUND) {
                                                             retval = PKCS5_PBKDF2_HMAC(
                                                                 (char*)Key, keylen, RandSalt, RANDOM_LEN, ITERATE_TIMES, (EVP_MD*)EVP_sha256(), RANDOM_LEN, decrypt_key);
                                                             if (!retval) {
                                                                 ……
                                                                 return false;
                                                             }
                                                             retval = PKCS5_PBKDF2_HMAC((char*)user_key,
                                                                 RANDOM_LEN,
                                                                 RandSalt,
                                                                 RANDOM_LEN,
                                                                 MAC_ITERATE_TIMES,
                                                                 (EVP_MD*)EVP_sha256(),
                                                                 RANDOM_LEN,
                                                                 mac_key);


                                                             if (!retval) {
                                                                 ……
                                                                 return false;
                                                             }


                                                             errorno = memcpy_s(random_salt_used[usage_frequency[insert_position]], RANDOM_LEN, RandSalt, RANDOM_LEN);
                                                             securec_check_c(errorno, "\0", "\0");


                                                             for (GS_UINT32 j = 0; j < RANDOM_LEN; ++j) {
                                                                 GS_UCHAR mask = random_salt_used[usage_frequency[insert_position]][j];
                                                                 user_input_used[usage_frequency[insert_position]][j] = ((char)user_key[j] ^ (char)mask);
                                                                 derive_vector_used[usage_frequency[insert_position]][j] = ((char)decrypt_key[j] ^ (char)mask);
                                                                 mac_vector_used[usage_frequency[insert_position]][j] = ((char)mac_key[j] ^ (char)mask);
                                                             }


                                                             insert_position = (insert_position + 1) % (NUMBER_OF_SAVED_DERIVEKEYS / 2) + NUMBER_OF_SAVED_DERIVEKEYS / 2;
                                                         }

                                                      使用派生密钥去解密密文。相关代码如下:

                                                        GS_UINT32 CRYPT_decrypt(GS_UINT32 ulAlgId, const GS_UCHAR* pucKey, GS_UINT32 ulKeyLen, const GS_UCHAR* pucIV,
                                                        GS_UINT32 ulIVLen, GS_UCHAR* pucCipherText, GS_UINT32 ulCLen, GS_UCHAR* pucPlainText, GS_UINT32* pulPLen)
                                                        ……
                                                        cipher = get_evp_cipher_by_id(ulAlgId);
                                                           if (cipher == NULL) {
                                                               (void)fprintf(stderr, ("invalid ulAlgType for cipher,please check it!\n"));
                                                               return 1;
                                                           }
                                                           ctx = EVP_CIPHER_CTX_new();
                                                           if (ctx == NULL) {
                                                               (void)fprintf(stderr, ("ERROR in EVP_CIPHER_CTX_new:\n"));
                                                               return 1;
                                                           }
                                                           EVP_CipherInit_ex(ctx, cipher, NULL, pucKey, pucIV, 0);
                                                           EVP_CIPHER_CTX_set_padding(ctx, 0);
                                                           if (!EVP_DecryptUpdate(ctx, pucPlainText, &dec_num, pucCipherText, ulCLen)) {
                                                               (void)fprintf(stderr, ("ERROR in EVP_DecryptUpdate\n"));
                                                               goto err;
                                                           }
                                                           *pulPLen = dec_num;
                                                           if (!EVP_DecryptFinal(ctx, pucPlainText + dec_num, &dec_num)) {
                                                               (void)fprintf(stderr, ("ERROR in EVP_DecryptFinal\n"));
                                                               goto err;
                                                           }
                                                           *pulPLen += dec_num;


                                                           /* padding bytes of the last block need to be removed */
                                                           blocksize = EVP_CIPHER_CTX_block_size(ctx);

                                                        至此完成解密过程。

                                                        6.2  数据动态脱敏

                                                        数据脱敏,顾名思义就是将敏感数据通过变形、屏蔽等方式处理,其目的是保护隐私数据信息,防止数据泄露和恶意窥探。当企业或者机构收集用户个人身份数据、手机、银行卡号等敏感信息,然后将数据通过导出(非生产环境)或直接查询(结合生产环境)的方式投入使用时,按照隐私保护相关法律法规需将数据进行“脱敏”处理。

                                                        openGauss实现了数据动态脱敏机制,它根据一系列用户配置的“脱敏策略”来对查询命令进行分析匹配,最终将敏感数据屏蔽并返回。使用数据动态脱敏特性总的来说分为两个步骤:配置脱敏策略、触发脱敏策略。本小节将对这两个步骤进行具体分析。

                                                        显然只有在配置脱敏策略后系统才能有根据地进行敏感数据脱敏。openGauss提供了脱敏策略配置(创建、修改、删除)语法,这些语法所涉及的语法解析节点内容大致相同,因此这里仅对创建策略相关数据结构进行分析,其余不再赘述。下面将结合一个具体示例对数据动态脱敏特性进行详细介绍。

                                                        表6给出了一张包含敏感信息(薪资、银行卡号)的个人信息表,策略管理员要对该表中的敏感信息创建脱敏策略:当用户user1或user2在IP地址10.123.123.123上使用JDBC或gsql连接数据库并查询个人信息表时,系统将自动屏蔽敏感信息。

                                                        表6  个人信息表person

                                                        id

                                                        name

                                                        gender

                                                        salary

                                                        creditcards

                                                        1

                                                        张三

                                                        10000

                                                        6210630600006321083

                                                        2

                                                        李四

                                                        15000

                                                        6015431250003215514

                                                        3

                                                        王五

                                                        20000

                                                        5021134522201529881

                                                        首先策略管理员需要对敏感列打标签,随后使用标签创建脱敏策略,策略配置DDL语句如下。

                                                        例1脱敏策略配置示例。

                                                        配置资源标签:

                                                        (1) CREATE RESOURCE LABEL salary_label ADD COLUMN(person.salary);

                                                        (2) CREATE RESOURCE LABEL creditcard_label ADD COLUMN(person.creditcards);

                                                        配置脱敏策略:

                                                        (3) CREATE MASKING POLICY mask_person_policy MASKALL ON LABEL(salary_label), CREDITCARDMASKING ON label(creditcard_label) FILTER ON ROLES(user1,user2), IP(‘10.123.123.123’), APP(jdbc, gsql);

                                                        user1在10.123.123.123地址使用gsql查询敏感数据:

                                                        (4) SELECT id, salary, creditcards FROM public.person;

                                                        下面将对“CREATE MASKING POLICY”语句所涉及的语法结构定义进行逐一介绍。

                                                        数据结构CreateMaskingPolicyStmt代码如下:

                                                          typedef struct CreateMaskingPolicyStmt
                                                          {
                                                          NodeTag     type;
                                                          char        *policy_name;    /*  脱敏策略名称  */
                                                          List        *policy_data;    /*  脱敏策略行为  */
                                                          List        *policy_filters; /*  用户过滤条件  */
                                                          bool        policy_enabled;  /*  策略开关  */
                                                          } CreateMaskingPolicyStmt;

                                                          脱敏策略创建语法是对CreateMaskingPolicyStmt函数进行填充,其中policy_data是由若干DefElem节点组成的List,每个DefElem指出了以何种方式脱敏数据库资源,DefElem->name标识脱敏方法,DefElem->arg代表脱敏对象。

                                                          “6.2  数据动态脱敏”小节中例1脱敏策略配置示例的步骤0对应的policy_data组织结构如图31所示。

                                                          图31  脱敏策略配置示例对应的policy_data组织结构

                                                          policy_filters属性通过二叉逻辑树的形式描述了哪些用户场景(用户名、客户端、登录IP)可以使脱敏策略生效,policy_filters指向了逻辑树的根节点,只有当用户信息与逻辑树匹配时(匹配方式详见图35),脱敏策略才会被触发。逻辑树节点结构如下所示:

                                                            typedef struct PolicyFilterNode
                                                            {
                                                            NodeTag     type;
                                                            char        *node_type;  /*  逻辑操作类型,取值为“op”或“filter”  */
                                                            char        *op_value;   /*  逻辑操作符,仅当node_type为op时取值为“and”或“or”,否则为NULL  */
                                                            char        *filter_type;/*  过滤数据类型,仅当node_type为filter时取值为“APP”、“ROLES”、“IP”  */
                                                            List        *values;     /*  过滤数据值List,指出具体的过滤条件值,若node_type为op时置NULL  */
                                                            Node        *left;       /*  左子树  */
                                                            Node        *right;      /*  右子树  */
                                                            } PolicyFilterNode;

                                                            逻辑树节点分为操作符(op)节点和过滤数据(filter)节点。当op节点分为“与”或“或”关系,其op_value将置为“and”或“or”,其左右子树代表操作符左右子表达式。filter节点一般作为op的叶子节点出现,它标识具体的过滤信息并将其值存放在values链表中。需要注意的是,一个节点不可能既是op节点又是filter节点。“6.2  数据动态脱敏”小节中例1脱敏策略配置示例的步骤0对应的policy_filters组织结构如图32所示。

                                                            图32  配置脱敏策略对应的policy_filters 组织结构

                                                            脱敏策略配置的总体流程如图33所示。

                                                            图33  脱敏策略配置流程图

                                                            在查询编译脱敏策略配置SQL之后将进入策略增删改主函数中,首先会根据语法解析节点校验相关参数的合法性,做如下检查:

                                                            (1) 检查脱敏策略指定的数据库资源是否存在。

                                                            (2) 检查脱敏函数是否存在。

                                                            (3) 检查脱敏策略是否已存在。

                                                            (4) 检查脱敏相关约束:脱敏对象必须为基本表的数据列、脱敏列类型必须满足规格限制、脱敏列只允许加载一个脱敏函数。

                                                            (5) 检查Masking Filter是否冲突,不允许同一数据库资源在相同用户场景下触发多个策略。

                                                            其中Masking Filter冲突校验的目的是防止用户场景同时满足多个脱敏策略限制,导致策略匹配时系统无法判断应该触发哪种脱敏策略。因此在创建策略时要保证其过滤条件与现存的策略互斥,主要是判断是否存在一种用户场景能够同时满足多个MASKING FILTER。在“6.2  数据动态脱敏”小节所示的表6数据基础上,如下表中策略A和策略B是相互冲突的,而策略A和策略C是互斥的。

                                                            脱敏策略冲突或互斥场景如下所示:

                                                              策略A:CREATE MASKING POLICY mask_A MASKALL ON LABEL(creditcard_label) FILTER ON IP(’10.123.123.123’), APP(jdbc), ROLES(user1);
                                                              策略B:CREATE MASKING POLICY mask_B CREDITCARDMASKING ON LABEL(creditcard_label) FILTER ON IP(’10.123.123.123’,’10.90.132.132’), APP(jdbc, gsql), ROLES(user1);
                                                              策略C:CREATE MASKING POLICY mask_C CREDITCARDMASKING ON LABEL(creditcard_label) FILTER ON IP(’10.123.123.123’ ,’10.90.132.132’), APP(jdbc), ROLES(user2);

                                                              随后将依据策略配置信息更新系统表:

                                                              (1) 更新gs_masking_policy系统表,存储policy基本信息。

                                                              (2) 更新gs_masking_policy_actions系统表,存储策略对应的脱敏方式及脱敏对象。

                                                              (3) 更新gs_masking_policy_filter系统表,存储脱敏用户场景过滤信息。此时会将逻辑树转换为逻辑表达式字符串进行存储,在之后的敏感数据访问时该字符串将会重新转换为逻辑树进行场景校验。


                                                              为了降低策略读取I/O损耗,openGauss维护了一组线程级别的策略缓存,用于保存已配置的脱敏策略,并在策略配置后进行实时刷新。

                                                              在用户进行数据查询时,数据动态脱敏特性使用openGauss的HOOK机制,将查询编译生成的查询树钩取出来与脱敏策略进行匹配,最后将查询树按照脱敏策略内容改写成不包含敏感数据的“脱敏”查询树返还给解析层继续执行,最终实现屏蔽敏感数据的能力。其执行流程如图34所示。

                                                              图34  脱敏策略执行流程图

                                                              在对一个访问数据库资源的查询树进行脱敏之前,需要准备一份待匹配的脱敏策略集合,其依据就是用户登录信息,check_masking_policy_filter函数的任务就是将用户信息与所有的脱敏策略进行匹配,筛选出可能被查询触发的脱敏策略。最终筛选如下脱敏策略。

                                                              (1) 若脱敏策略没有配置过滤条件信息,说明对所有用户生效。

                                                              (2) 若当前用户信息与脱敏策略的过滤条件匹配,则说明对当前用户生效。

                                                              在每个脱敏策略从系统表读入缓存时,需要将对应的过滤条件逻辑表达式转换为逻辑树并将逻辑树根节点存入缓存中,将其作为脱敏策略筛选条件。逻辑树结构代码如下:

                                                                class PolicyLogicalTree {
                                                                public:
                                                                   …
                                                                   bool parse_logical_expression(const gs_stl::gs_string logical_expr_str);   /*  逻辑表达式构造逻辑树入口函数  */
                                                                   bool match(const FilterData *filter_item);
                                                                   bool has_intersect(PolicyLogicalTree *arg);
                                                                private:
                                                                   gs_stl::gs_vector<PolicyLogicalNode> m_nodes;  /*  逻辑节点集合,包含了逻辑树中所有的节点  */
                                                                   gs_stl::gs_vector<int> m_flat_tree;   /*  利用数组将逻辑节点索引构造逻辑二叉树  */
                                                                   /*  逻辑表达式转换为逻辑树的递归函数  */
                                                                bool parse_logical_expression_impl(const gs_stl::gs_string logical_expr_str, int *offset, int *idx, Edirection direction);
                                                                   inline void create_node(int *idx, EnodeType type, bool has_operator_not); /*  创建单个逻辑树节点  */
                                                                   void flatten_tree();  /*  将逻辑树刷新到m_nodes集合与m_flat_tree索引中  */
                                                                   bool check_apps_intersect(string_sort_vector*, string_sort_vector*);
                                                                   bool check_roles_intersect(oid_sort_vector*, oid_sort_vector*);
                                                                   bool m_has_ip;    /*  标识整个逻辑树是否涉及ip校验  */
                                                                   bool m_has_role;  /*  标识整个逻辑树是否涉及用户名校验  */
                                                                   bool m_has_app;   /*  标识整个逻辑树是否涉及客户端校验  */
                                                                };

                                                                逻辑树节点的结构与语法解析中的FILTER节点类似,具体可以参照PolicyFilterNode结构。相关代码如下:

                                                                  struct PolicyLogicalNode {
                                                                     ...
                                                                  EnodeType m_type;
                                                                  int m_left;                /*  左子节点索引  */
                                                                  int m_right;               /*  右子节点索引  */
                                                                  void make_eval(const FilterData *filter_item); /* 判断用户信息是否满足本节点子树表示的逻辑。  */
                                                                  bool m_eval_res;
                                                                  oid_sort_vector m_roles;   /*  本节点包含的用户名集合  */
                                                                  string_sort_vector m_apps; /*  本节点包含的客户端名称集合  */
                                                                  IPRange m_ip_range;        /* 本节点包含的IP  */
                                                                  };

                                                                  当需要将逻辑表达式转变为逻辑树时,parse_logical_expression_impl函数将对逻辑表达式字符串进行递归解析,识别出表达式包含的操作符(and或or)以及过滤条件信息(ip、roles、app),构造出PolicyLogicalNode并使用左右子节点索引(m_left、m_right)链接起来形成逻辑树并将每个节点存入m_nodes中,最终利用m_nodes构造m_flat_tree数组来模拟二叉树。

                                                                  m_flat_tree数组的作用是标记逻辑树节点间关系以及标识哪些节点是逻辑树的叶子节点。当用户信息与逻辑树某节点进行匹配时,首先需要与其左右子树进行匹配,然后根据该节点的逻辑运算符来判断是否满足过滤条件要求,而左右子树的判断结果又依赖于它们的子树的结果,因此这种递归判断方法首先将会是取叶子节点进行用户信息匹配。

                                                                  openGauss使用“自底向上”的方式来进行用于信息与逻辑树的匹配。从m_flat_tree末尾(叶子节点)进行匹配,将匹配结果记录下来,当匹配到非叶子节点时(op节点)只需使用其左右子节点结果进行判断即可,最终实现整个逻辑树的匹配。在例1脱敏策略配置示例中创建脱敏策略后,当用户使用非受限的客户端访问敏感数据时,逻辑树匹配结果如图35所示。

                                                                  图35  逻辑树匹配示例

                                                                  在筛选出脱敏策略后,就需要对查询树所有TargetEntry进行识别和策略匹配。从openGauss源码可以看到,脱敏策略支持对SubLink、Aggref、OpExpr、RelabelType、FuncExpr、CoerceViaIO、Var类型的节点进行解析识别。数据脱敏的核心思路是:Var类型节点代表了访问的数据库资源,而非Var类型节点可能包含Var节点;因此需要根据其参数递归的寻找Var节点,最后将识别到的所有Var节点进行策略匹配并根据策略内容进行节点替换。

                                                                  识别脱敏节点源码如下:

                                                                    static bool mask_expr_node(ParseState *pstate, Expr*& expr,
                                                                       const policy_set *policy_ids, masking_result *result, List* rtable, bool can_mask)
                                                                    {
                                                                       if (expr == NULL) {
                                                                           return false;
                                                                       }
                                                                       switch (nodeTag(expr)) {
                                                                           case T_SubLink:
                                                                               ... /*  解析SubLink节点  */
                                                                           case T_FuncExpr:
                                                                               ... /*  解析FuncExpr节点  */
                                                                           case T_Var:
                                                                               return handle_masking_node(pstate, expr, policy_ids, result, rtable, can_mask); /*  进入最后脱敏处理过程  */
                                                                           break;
                                                                           case T_RelabelType:
                                                                               ... /*  解析RelabelType节点  */
                                                                           case T_CoerceViaIO:
                                                                               ... /*  解析CoerceViaIO节点  */
                                                                           case T_Aggref:
                                                                               ... /*  解析Aggref节点  */
                                                                           case T_OpExpr:
                                                                               ... /*  解析OpExpr节点  */
                                                                           default:
                                                                               break;
                                                                       }
                                                                       return false;
                                                                    }

                                                                    在匹配脱敏策略时,首先需要将识别出的Var节点进行解析,将其转为PolicyLabelItem,该数据结构存储了数据列的全部路径信息,然后将其与已过滤出的脱敏策略集合进行匹配;若某个脱敏策略对应的数据库资源对象与PolicyLabelItem一致,将已匹配到的脱敏策略指定的方式替换该Var节点。相关数据结构PolicyLabelItem的代码如下:

                                                                      struct PolicyLabelItem {
                                                                         ...
                                                                         void get_fqdn_value(gs_stl::gs_string *value) const;


                                                                         bool operator < (const PolicyLabelItem& arg) const;
                                                                         bool operator == (const PolicyLabelItem& arg) const;
                                                                         bool empty() const {return strlen(m_column) == 0;}
                                                                         void set_object(const char *obj, int obj_type = 0);
                                                                         void set_object(Oid objid, int obj_type = 0);
                                                                         Oid            m_schema;     /*  数据库资源所属的namespace OID  */
                                                                         Oid            m_object;     /*  数据库资源所属的table OID  */
                                                                         char           m_column[256];/*  列名
                                                                         int            m_obj_type;   /*  资源类型,数据动态脱敏仅支持对column生效  */
                                                                      };

                                                                      脱敏策略匹配成功后,将会根据策略内容替换包含敏感信息的Var节点,使之外嵌脱敏函数。最后将修改后的查询树返还给解析器继续执行,最终敏感数据将会在脱敏函数的作用下以脱敏的形式返回给客户端。“9.6.2 数据动态脱敏”小节中例9-1脱敏策略配置示例步骤(4)中,当SELECT语句触发脱敏策略时,查询树被替换前后的数据结构如图36所示。

                                                                      图36  查询树脱敏前后的数据结构示例

                                                                      至此整个查询树已经完成了脱敏策略的匹配与重写,随后将重新回归查询解析模块并继续执行后续处理,最终系统将返回脱敏后的数据结果。

                                                                      6.3  密态等值查询

                                                                      除了传统的数据存储加密和数据脱敏等数据保护技术外,openGauss从1.1.0版本开始支持了一种全新的数据全生命周期保护方案:全密态数据库机制。在这种机制下数据在客户端就被加密,从客户端传输到数据库内核,到在内核中完成查询运算,到返回结果给客户端,数据始终处于加密状态,而数据加解密所需的密钥则由用户持有;从而实现了数据拥有者和数据处理者的数据权属分离,有效规避由内鬼和不可信第三方等威胁造成的数据泄漏风险。

                                                                      本小节重点介绍全密态数据库的第一阶段能力——密态等值查询。与非加密数据库相比,密态等值查询主要提供以下能力。

                                                                      (1) 数据加密:openGauss通过客户端驱动加密敏感数据,保证敏感数据明文不在除客户端驱动外的地方存在。遵循密钥分级原则将密钥分为数据加密密钥和密钥加密密钥,客户端驱动仅需要妥善保管密钥加密密钥即可保证只有自己才拥有解密数据密文的能力。

                                                                      (2) 数据检索:openGauss支持在用户无感知的情况下,为其提供对数据库密文进行等值检索的能力。在数据加密阶段,openGauss会将与加密相关的元数据存储在系统表中,当处理敏感数据时,客户端会自动检索加密相关元数据并对数据进行加解密。

                                                                      openGauss新增数据加解密表语法,通过采用驱动层过滤技术,在客户端的加密驱动中集成了SQL语法解析、密钥管理和敏感数据加解密等模块来处理相关语法。加密驱动源码流程如图37所示。

                                                                      图37  客户端加密驱动源码流程

                                                                      用户执行SQL查询语句时,通过Pqexec函数执行SQL语句,SQL语句在发送之前首先进入run_pre_query函数函数,通过前端解析器解析涉及密态的语法。然后在run_pre_statement函数中通过分类器对语法标签进行识别,进入对应语法的处理逻辑。在不同的处理逻辑函数中,查找出要替换的数据参数,并存储在结构体StatementData中,数据结构如图38所示。最后通过replace_raw_values函数重构SQL语句,将其发送给服务端。在PqgetResult函数接收到从服务端返回的数据时,若是加密数据类型,则用deprocess_value函数对加密数据进行解密。接收完数据后还需要在run_post_query函数中刷新相应的缓存。

                                                                      图38  客户端加密驱动数据结构

                                                                      openGauss密态数据库采用列级加密,用户在创建加密表的时候需要指定加密列的列加密密钥(Column Encryption Key,CEK)和加密类型,以确定该数据列以何种方式进行加密。同时,在创建表前,应该先创建客户端主密钥(client master key,CMK)。

                                                                      整个加密步骤和语法可简化为如下3个阶段:创建客户端密钥CMK、创建列加密密钥CEK和创建加密表。下面将结合一个具体示例对密态等值查询特性进行详细介绍。

                                                                      密态等值查询示例如下。

                                                                      (1) 创建CMK客户端主密钥。

                                                                        CREATE CLIENT MASTER KEY cmk_1 WITH (KEY_STORE = LOCALKMS , KEY_PATH = "kms_1" , ALGORITHM = RSA_2048);

                                                                        (2) 创建CEK列加密密钥。

                                                                          CREATE COLUMN ENCRYPTION KEY cek_1 WITH VALUES (CLIENT_MASTER_KEY = cmk_1, ALGORITHM = AEAD_AES_256_CBC_HMAC_SHA256);

                                                                          (3) 创建加密表。

                                                                            CREATE TABLE creditcard_info (id_number int, name text encrypted with (column_encryption_key = cek_1, encryption_type = DETERMINISTIC), gender varchar(10) encrypted with (column_encryption_key = cek_1, encryption_type = DETERMINISTIC), salary float4 encrypted with (column_encryption_key = cek_1, encryption_type = DETERMINISTIC),credit_card varchar(19) encrypted with (column_encryption_key = cek_1, encryption_type = DETERMINISTIC));

                                                                            如示例所示,首先使用“CREATE CLIENT MASTER KEY”语法创建客户端主密钥,其所涉及的语法结构定义如下:

                                                                              /*  保存创建客户端主密钥的语法信息  */
                                                                              typedef struct CreateClientLogicGlobal {
                                                                                 NodeTag type;
                                                                                 List *global_key_name;         /*  全密态数据库主密钥名称  */
                                                                                 List *global_setting_params;  /*  全密态数据库主密钥参数,每一个元素是一个ClientLogicGlobalparam  */
                                                                              } CreateClientLogicGlobal;


                                                                              /*  保存客户端主密钥参数信息  */
                                                                              typedef struct ClientLogicGlobalParam {
                                                                                 NodeTag type;
                                                                                 ClientLogicGlobalProperty key;  /*  键  */
                                                                                 char *value;                       /*  值  */
                                                                                 unsigned int len;                 /*  值长度  */
                                                                                 int location;                     /*  位置标记  */
                                                                              } ClientLogicGlobalParam;


                                                                              /*  保存客户端主密钥参数的key的枚举类型 */
                                                                              typedef enum class ClientLogicGlobalProperty {
                                                                                 CLIENT_GLOBAL_FUNCTION,  /*  默认为encryption  */
                                                                                 CMK_KEY_STORE,             /*  目前仅支持localkms  */
                                                                                 CMK_KEY_PATH,              /*  密钥存储路径  */
                                                                                 CMK_ALGORITHM              /*  指定加密CEK的算法  */
                                                                              } ClientLogicGlobalProperty;
                                                                                CREATE CLIENT MASTER KEY cmk_1 WITH (KEY_STORE = LOCALKMS , KEY_PATH = "kms_1" , ALGORITHM = RSA_2048);

                                                                                上面命令的参数说明为:

                                                                                (1) KEY_STORE:指定管理CMK的组件或工具;目前仅支持localkms模式。

                                                                                (2) KEY_PATH:一个KEY_STORE中存储了多个CMK,而KEY_PATH用于唯一标识CMK。

                                                                                (3) ALGORITHM:CMK被用于加密CEK,该参数指定加密CEK的算法,即指定CMK的密钥类型。

                                                                                客户端主密钥创建语法本质上是将CMK的元信息解析并保存在CreateClientLogicGlobal结构体中。其中global_key_name是密钥名称,global_setting_params是一个List结构,每个节点是一个ClientLogicGlobalParam结构,以键值的形式保存着密钥的信息。客户端先通过解析器“fe_raw_parser()”解析为CreateClientLogicGlobal结构体,对其参数进行校验并发送查询语句到服务端;服务端解析为CreateClientLogicGlobal结构体并检查用户namespace等权限,CMK元信息保存在系统表中。创建CMK的总体流程如图39所示。

                                                                                图39  客户端主密钥CMK创建流程

                                                                                有了主密钥CMK,可以基于此创建CEK,下面将对CREATE COLUMN ENCRYPTION KEY语句所涉及的语法结构定义进行逐一介绍。

                                                                                CREATE COLUMN ENCRYPTION KEY语法相关数据结构:

                                                                                  /*  保存创建列加密密钥的语法信息  */
                                                                                  typedef struct CreateClientLogicColumn {
                                                                                     NodeTag type;
                                                                                     List *column_key_name;        /*  列加密密钥名称  */
                                                                                     List *column_setting_params; /*  列加密密钥参数  */
                                                                                  } CreateClientLogicColumn;


                                                                                  /*  保存列加密密钥参数,保存在CreateClientLogicColumn的column_setting_params中  */
                                                                                  typedef struct ClientLogicColumnParam {
                                                                                     NodeTag type;
                                                                                     ClientLogicColumnProperty key;
                                                                                     char *value;
                                                                                     unsigned int len;
                                                                                     List *qualname;
                                                                                     int location;
                                                                                  } ClientLogicColumnParam;


                                                                                  /*  保存列加密密钥参数的key的枚举类型  */
                                                                                  typedef enum class ClientLogicColumnProperty {
                                                                                     CLIENT_GLOBAL_SETTING,    /*  加密CEK的CMK  */
                                                                                     CEK_ALGORITHM,             /*  加密用户数据的算法  */
                                                                                     CEK_EXPECTED_VALUE,       /*  CEK密钥明文,可选参数  */
                                                                                     COLUMN_COLUMN_FUNCTION,  /*  默认为encryption  */
                                                                                  } ClientLogicColumnProperty;


                                                                                  CREATE COLUMN ENCRYPTION KEY cek_1 WITH VALUES (CLIENT_MASTER_KEY = cmk_1, ALGORITHM = AEAD_AES_256_CBC_HMAC_SHA256);

                                                                                  上面命令的参数说明为:

                                                                                  (1) CLIENT_MASTER_KEY:指定用于加密CEK的CMK对象。

                                                                                  (2) ALGORITHM:CEK被用于加密用户数据,该参数指定加密用户数据的算法,即指定CEK的密钥类型。

                                                                                  (3) ENCRYPTED_VALUE:列加密密钥的明文,默认随机生成,也可由用户指定,用户指定时密钥长度范围为28~256位。

                                                                                  列加密密钥创建语法是通过前端解析器将参数解析成CreateClientLogicColumn结构体后,通过校验指定用于加密CEK的CMK对象是否存在后加载CMK缓存,然后通过“HooksManager::ColumnSettings::pre_create”语句调用加密函数“EncryptionColumnHookExecutor::pre_create”来校验各参数并生成或加密ENCRYPTED_VALUE值,最后在“EncryptionPreProcess::set_new_query”函数中替换ENCRYPTED_VALUE参数为CEK密文,重构SQL查询语句。重构后的SQL语句发送给服务端后服务端解析为CreateClientLogicColumn结构体并检查用户namespace等权限,将CEK的信息保存在系统表中。创建CEK的总体流程如图40所示,组织结构如图41所示。

                                                                                  图40  列加密密钥CEK创建流程

                                                                                  图41  客户端主密钥CMK的组织结构

                                                                                  在对CEK参数进行解析后,使用CMK对ENCRYPTED_VALUE参数进行加密,加密完成后使用加密后的ENCRYPTED_VALUE参数和其他参数对创建CEK的语法进行重构。将输入的查询语句转换成加密查询语句的主要函数入口代码如下:

                                                                                    void EncryptionPreProcess::set_new_query(char **query, size_t query_size, StringArgs string_args, int location,
                                                                                       int encrypted_value_location, size_t encrypted_value_size, size_t quote_num)
                                                                                    {
                                                                                    for (size_t i = 0; i < string_args.Size(); i++) {
                                                                                       /*  从string_args中读取键值存到变量中  */
                                                                                           char string_to_add[MAX_KEY_ADD_LEN];
                                                                                           errno_t rc = memset_s(string_to_add, MAX_KEY_ADD_LEN, 0, MAX_KEY_ADD_LEN);
                                                                                           securec_check_c(rc, "\0", "\0");
                                                                                           size_t total_in = 0;
                                                                                           if (string_args.at(i) == NULL) {
                                                                                               continue;
                                                                                           }
                                                                                           const char *key = string_args.at(i)->key;
                                                                                           const char *value = string_args.at(i)->value;
                                                                                           const size_t vallen = string_args.at(i)->valsize;
                                                                                           if (!key || !value) {
                                                                                               Assert(false);
                                                                                               continue;
                                                                                           }
                                                                                           Assert(vallen < MAX_KEY_ADD_LEN);
                                                                                           /*  将key和value构造成encrypted_value = '密文值'的形式  */
                                                                                           check_strncat_s(strncat_s(string_to_add, MAX_KEY_ADD_LEN, key, strlen(key)));
                                                                                           total_in += strlen(key);
                                                                                           check_strncat_s(strncat_s(string_to_add, MAX_KEY_ADD_LEN, "=\'", strlen("=\'")));
                                                                                           total_in += strlen("=\'");
                                                                                           check_strncat_s(strncat_s(string_to_add, MAX_KEY_ADD_LEN, value, vallen));
                                                                                           total_in += vallen;
                                                                                           check_strncat_s(strncat_s(string_to_add, MAX_KEY_ADD_LEN, "\'", strlen("\'")));
                                                                                           total_in += strlen("\'");
                                                                                           Assert(total_in < MAX_KEY_ADD_LEN);
                                                                                           /*  encrypted_value_location不为空,则说明用户提供EXPECTED_VALUE,将明文值替换成密文值   */
                                                                                           if (encrypted_value_location && encrypted_value_size) {
                                                                                               *query = (char *)libpq_realloc(*query, query_size, query_size + vallen + 1);
                                                                                               if (*query == NULL) {
                                                                                                   return;
                                                                                               }
                                                                                               check_memset_s(memset_s(*query + query_size, vallen + 1, 0, vallen + 1));
                                                                                               char *replace_dest = *query + encrypted_value_location + strlen("\'");
                                                                                               char *move_src =
                                                                                                   *query + encrypted_value_location + encrypted_value_size + quote_num + strlen("\'");
                                                                                               char *move_dest = *query + encrypted_value_location + vallen + strlen("\'");
                                                                                               check_memmove_s(memmove_s(move_dest,
                                                                                                   query_size - encrypted_value_location - encrypted_value_size - strlen("\'") + 1,
                                                                                                   move_src,
                                                                                                   query_size - encrypted_value_location - encrypted_value_size - strlen("\'")));
                                                                                               query_size = query_size + vallen - encrypted_value_size;
                                                                                               check_memcpy_s(memcpy_s(replace_dest, query_size - encrypted_value_location, value, vallen));
                                                                                           } else {
                                                                                    /*  EXPECTED_VALUE是随机生成的,则直接插入原先的语句中  */
                                                                                               check_strcat_s(strcat_s(string_to_add, MAX_KEY_ADD_LEN, ","));
                                                                                               size_t string_to_add_size = strlen(string_to_add);
                                                                                               *query = (char *)libpq_realloc(*query, query_size, query_size + string_to_add_size + 1);
                                                                                               if (*query == NULL) {
                                                                                                   return;
                                                                                               }
                                                                                               check_memmove_s(memmove_s(*query + location + string_to_add_size, query_size - location, *query + location,
                                                                                                   query_size - location));
                                                                                               query_size += string_to_add_size;
                                                                                               check_memcpy_s(memcpy_s(*query + location, query_size - location, string_to_add, string_to_add_size));
                                                                                           }
                                                                                           query[0][query_size] = '\0';
                                                                                       }
                                                                                       return;
                                                                                    }

                                                                                    接下来创建加密表。

                                                                                      CREATE TABLE creditcard_info (id_number int, name text encrypted with (column_encryption_key = cek_1, encryption_type = DETERMINISTIC), gender varchar(10) encrypted with (column_encryption_key = cek_1, encryption_type = DETERMINISTIC), salary float4 encrypted with (column_encryption_key = cek_1, encryption_type = DETERMINISTIC),credit_card varchar(19) encrypted with (column_encryption_key = cek_1, encryption_type = DETERMINISTIC));

                                                                                      创建加密表的SQL语句在语法解析后进入CreateStmt函数处理逻辑,在run_pre_create_statement函数中,对CreateStmt->tableElts中每个ListCell进行判断,当前加密表仍存在一定的约束,加密表列定义及约束处理函数段代码如下:

                                                                                        bool createStmtProcessor::run_pre_create_statement(const CreateStmt * const stmt, StatementData *statement_data)
                                                                                        {
                                                                                           …
                                                                                         /*  加密表列定义及约束处理  */
                                                                                           foreach (elements, stmt->tableElts) {
                                                                                               Node *element = (Node *)lfirst(elements);
                                                                                               switch (nodeTag(element)) {
                                                                                                   case T_ColumnDef: {
                                                                                                       …
                                                                                                      /*  校验distribute by是否符合规格  */
                                                                                                       if (column->colname != NULL &&
                                                                                                           !check_distributeby(stmt->distributeby, column->colname)) {
                                                                                                           return false;
                                                                                                       }
                                                                                                      /*  列定义处理,存储加密类型,加密密钥等信息  */
                                                                                                       if (!process_column_defintion(column, element, &expr_vec, &cached_columns,
                                                                                                           &cached_columns_for_defaults, statement_data)) {
                                                                                                           return false;
                                                                                                       }
                                                                                                       break;
                                                                                                   }
                                                                                                   /*  处理check, unique 或其他约束  */
                                                                                                   case T_Constraint: {
                                                                                                       Constraint *constraint = (Constraint*)element;
                                                                                                       if (constraint->keys != NULL) {
                                                                                                           ListCell *ixcell = NULL;
                                                                                                           foreach (ixcell, constraint->keys) {
                                                                                                               char *ikname = strVal(lfirst(ixcell));
                                                                                                               for (size_t i = 0; i < cached_columns.size(); i++) {
                                                                                                                   if (strcmp((cached_columns.at(i))->get_col_name(), ikname) == 0 && !check_constraint(
                                                                                                                       constraint, cached_columns.at(i)->get_data_type(), ikname, &cached_columns)) {
                                                                                                                       return false;
                                                                                                                   }
                                                                                                               }
                                                                                                           }
                                                                                                       } else if (constraint->raw_expr != NULL) {
                                                                                                           if (!transform_expr(constraint->raw_expr, "", &cached_columns)) {
                                                                                                               return false;
                                                                                                           }
                                                                                                       }
                                                                                                       break;
                                                                                                   }
                                                                                                   default:
                                                                                                       break;
                                                                                               }
                                                                                           }
                                                                                            …
                                                                                           /*  加密约束中需要加密的明文数据  */
                                                                                           if (!RawValues::get_raw_values_from_consts_vec(&expr_vec, statement_data, 0, &raw_values_list)) {
                                                                                               return false;
                                                                                           }
                                                                                           return ValuesProcessor::process_values(statement_data, &cached_columns_for_defaults, 1,
                                                                                               &raw_values_list);
                                                                                        }

                                                                                        在将创建加密表的查询语句发送给服务端后,服务端创建成功并返回执行成功的消息。数据加密驱动程序能够实现在数据发送到数据库之前透明地加密数据,数据在整个语句的处理过程中以密文形式存在,在返回结果时,解密返回的数据集,从而保证整个过程对用户是透明、无感知的。

                                                                                        定义了完整的加密表后,用户就可以用正常的方式将数据插入到表中。完整的加密过程见encrypt_data函数,其核心逻辑代码如下所示:

                                                                                          int encrypt_data(const unsigned char *plain_text, int plain_text_length, const AeadAesHamcEncKey &column_encryption_key,
                                                                                             EncryptionType encryption_type, unsigned char *result, ColumnEncryptionAlgorithm column_encryption_algorithm)
                                                                                          {
                                                                                             ……
                                                                                             /*  得到16位的iv值  */
                                                                                             unsigned char _iv [g_key_size + 1] = {0};
                                                                                          unsigned char iv_truncated[g_iv_size + 1] = {0};
                                                                                          /*  确定性加密,则通过hmac_sha256生成iv  */
                                                                                             if (encryption_type == EncryptionType::DETERMINISTIC_TYPE) {
                                                                                                    hmac_sha256(column_encryption_key.get_iv_key(), g_auth_tag_size, plain_text, plain_text_length, _iv);
                                                                                                ……
                                                                                          } else {
                                                                                          /*  随机加密,则随机生成iv  */
                                                                                                 if (encryption_type != EncryptionType::RANDOMIZED_TYPE) {
                                                                                                     return 0;
                                                                                                 }
                                                                                                 int res = RAND_priv_bytes(iv_truncated, g_block_size);
                                                                                                 if (res != 1) {
                                                                                                     return 0;
                                                                                                 }
                                                                                             }
                                                                                             int cipherStart = g_algo_version_size + g_auth_tag_size + g_iv_size;
                                                                                             /*  调用encrypt计算密文  */
                                                                                          int cipherTextSize = encrypt(plain_text, plain_text_length, column_encryption_key.get_encyption_key(), iv_truncated,
                                                                                                 result + cipherStart, column_encryption_algorithm);
                                                                                            ……
                                                                                             int ivStartIndex = g_auth_tag_size + g_algo_version_size;
                                                                                             res = memcpy_s(result + ivStartIndex, g_iv_size, iv_truncated, g_iv_size);
                                                                                             securec_check_c(res, "\0", "\0");
                                                                                             /*  计算 HMAC  */
                                                                                             int hmacDataSize = g_algo_version_size + g_iv_size + cipherTextSize;
                                                                                             hmac_sha256(column_encryption_key.get_mac_key(), g_auth_tag_size,
                                                                                                         result + g_auth_tag_size, hmacDataSize, result);
                                                                                             return (g_auth_tag_size + hmacDataSize);
                                                                                          }

                                                                                          openGauss密态数据库在进行等值查询的时候,整个查询过程对用户是无感知的,虽然存储在数据库中的数据是密文形式,但在展示数据给用户的时候会将密文数据进行解密处理。以从加密表中进行等值查询语句为例,整个语句处理过程如图9-42所示。客户端解析SELECT查询语句中的列属性信息,如果缓存已有则从缓存中提取列属性信息;如果缓存中找不到,需要从服务端查询该信息,并缓存。列加密密钥CEK是以密文形式存储在服务端,客户端需要解密CEK。然后用其加密SELECT查询语句中条件参数。加密后的SELECT查询语句发送给数据库服务端执行完成后,返回加密的查询结果集。客户端用解密后的列加密密钥CEK解密SELECT查询结果集,并返回解密后的明文结果集给应用端。

                                                                                          图42  SELECT语句时序图

                                                                                          等值查询处理run_pre_insert_statement函数,其核心逻辑代码如下所示:

                                                                                            bool Processor::run_pre_select_statement(const SelectStmt * const select_stmt, const SetOperation &parent_set_operation,
                                                                                               const bool &parent_all, StatementData *statement_data, ICachedColumns *cached_columns, ICachedColumns *cached_columns_parents)
                                                                                            {
                                                                                            bool select_res = false;
                                                                                            /*  处理SELECT语句中的集合操作  */
                                                                                               if (select_stmt->op != SETOP_NONE) {
                                                                                                   select_res = process_select_set_operation(select_stmt, statement_data, cached_columns);
                                                                                                   RETURN_IF(!select_res);
                                                                                               }
                                                                                               /*  处理WHERE子句  */
                                                                                               ExprPartsList where_expr_parts_list;
                                                                                               select_res = exprProcessor::expand_expr(select_stmt->whereClause, statement_data, &where_expr_parts_list);
                                                                                               RETURN_IF(!select_res);
                                                                                            ……
                                                                                               /*  从FROM子句中获取缓存加密列  */
                                                                                            CachedColumns cached_columns_from(false, true);
                                                                                               select_res = run_pre_from_list_statement(select_stmt->fromClause, statement_data, &cached_columns_from,
                                                                                                   cached_columns_parents);
                                                                                            ……
                                                                                            /*  将查询的加密列放在cached_columns结构中  */
                                                                                               for (size_t i = 0; i < cached_columns_from.size(); i++) {
                                                                                                   if (find_in_name_map(target_list, cached_columns_from.at(i)->get_col_name())) {
                                                                                                       CachedColumn *target = new (std::nothrow) CachedColumn(cached_columns_from.at(i));
                                                                                                       if (target == NULL) {
                                                                                                           fprintf(stderr, "failed to new CachedColumn object\n");
                                                                                                           return false;
                                                                                                       }
                                                                                                       cached_columns->push(target);
                                                                                                   }
                                                                                               }
                                                                                               if (cached_columns_from.is_empty()) {
                                                                                                   return true;
                                                                                               }
                                                                                               /*  加密列不支持ORDER BY(排序)操作  */
                                                                                               if (!deal_order_by_statement(select_stmt, cached_columns)) {
                                                                                                   return false;
                                                                                               }
                                                                                             
                                                                                            /*  将WHERE子句中加密的值进行加密处理  */
                                                                                               if (!WhereClauseProcessor::process(&cached_columns_from, &where_expr_parts_list, statement_data)) {
                                                                                                   return false;
                                                                                            }
                                                                                            ……
                                                                                               return true;
                                                                                            }

                                                                                            完整的客户端密文解密函数代码如下所示: 

                                                                                              int decrypt_data(const unsigned char *cipher_text, int cipher_text_length,
                                                                                                 const AeadAesHamcEncKey &column_encryption_key, unsigned char *decryptedtext,
                                                                                                 ColumnEncryptionAlgorithm column_encryption_algorithm)
                                                                                              {
                                                                                                 if (cipher_text == NULL || cipher_text_length <= 0 || decryptedtext == NULL) {
                                                                                                     return 0;
                                                                                                 }
                                                                                                 /*  校验密文长度  */
                                                                                                 if (cipher_text_length < min_ciph_len_in_bytes_with_authen_tag) {
                                                                                                     printf("ERROR(CLIENT): The length of cipher_text is invalid, cannot decrypt.\n");
                                                                                                     return 0;
                                                                                                 }
                                                                                                 /*  校验密文中的版本号  */
                                                                                                 if (cipher_text[g_auth_tag_size] != '1') {
                                                                                                     printf("ERROR(CLIENT): Version byte of cipher_text is invalid, cannot decrypt.\n");
                                                                                                     return 0;
                                                                                                 }
                                                                                                 ……
                                                                                                 /*  计算MAC标签  */
                                                                                                 unsigned char authenticationTag [g_auth_tag_size] = {0};
                                                                                                 int HMAC_length = cipher_text_length - g_auth_tag_size;
                                                                                                 int res = hmac_sha256(column_encryption_key.get_mac_key(), g_auth_tag_size,
                                                                                                     cipher_text + g_auth_tag_size, HMAC_length, authenticationTag);
                                                                                                 if (res != 1) {
                                                                                                     printf("ERROR(CLIENT): Fail to compute a keyed hash of a given text.\n");
                                                                                                     return 0;
                                                                                                 }
                                                                                                 /*  校验密文是否被篡改  */
                                                                                                 int cmp_result = my_memcmp(authenticationTag, cipher_text, g_auth_tag_size);
                                                                                                 /*  解密数据  */
                                                                                                 int decryptedtext_len = decrypt(cipher_text + cipher_start_index, cipher_value_length,
                                                                                                     column_encryption_key.get_encyption_key(), iv, decryptedtext, column_encryption_algorithm);
                                                                                                 if (decryptedtext_len < 0) {
                                                                                                     return 0;
                                                                                                 }
                                                                                                 decryptedtext[decryptedtext_len] = '\0';
                                                                                                 return decryptedtext_len;
                                                                                              }

                                                                                              七、 小结

                                                                                              随着信息安全的挑战越来越严重,保护系统安全可靠的运行,守护用户的数据隐私安全成为是当前数据库产品必须具备的能力。openGauss将逐步构建更加完备的安全能力,从身份认证、角色模型、权限管理、审计追踪、数据加解密等多维度来守护系统和数据安全。

                                                                                              本章节详细解析了openGauss的安全架构,并通过关键数据结构和关键函数代码解读描述每一种安全防护机制的实现细节,这些代码实现细节将有助于开发者了解openGauss的安全原理,并基于最新的安全标准来不断地优化和改善安全机制。

                                                                                              往期推荐

                                                                                              openGauss数据库源码解析系列文章——安全管理源码解析(一)

                                                                                              openGauss数据库源码解析系列文章——安全管理源码解析(二)

                                                                                              文章转载自openGauss,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                                              评论