暂无图片
暂无图片
4
暂无图片
暂无图片
暂无图片

[MYSQL] 自定义mysql脱敏中间件 -- 对指定连接进行指定字段的数据脱敏

原创 大大刺猬 2025-04-08
275

需求

昨天群友有这么一个需求: 对应特定的连接查询特定的表字段的时候,对其进行脱敏. 比如: select name, phone from tblname where id=xxx 查询出来的phone需要是脱敏的.即显示为:152****6666这种样子.
image.png

需求分析

对于数据脱敏, 我们可以在数据库层做, 也可以在返回数据的时候动手脚.

  1. 比较快速的是使用视图, 在建视图的时候,就对数据进行脱敏, 然后只给对应连接该视图的权限. 脱敏函数可以参考percona的插件data_masking.so的.
  2. 使用第三方的脱敏工具, 但是可能使用较为复杂.
  3. 自定义:做一个中间件转发业务sql,如果返回数据匹配到相关相关字段则进行脱敏

实现方法

前两者都有现成的方法. 我们主要看自定义的那一种.
image.png

那么具体该怎么做呢?

客户端发送的那一条线, 完全是转发, 不需要额外处理.

主要是服务端返回给客户端的时候需要注意下, 主要涉及3种包:

  1. column_count 记录数据行数的
  2. column 记录字段元数据信息的, 比如字段名字,字段类型等
  3. row 具体的数据行.
  4. EOF 数据返回完成.

image.png

column_count比较简单, 只有size(存储字段列数需要几个字节,)和字段列数
row 是每行数据一个包, 格式是所有字段放一堆, 使用大小+值的格式.
EOF 大小位置是0xfe,后面跟的实际上只有warnings和status_flags. 如下:
image.png

主要是column稍微复杂一丢丢(其实我们之前也讲过),我们这里以ColumnDefinition41为例, 结构如下:

字符对象往往由 长度+值构成, 比如 x03def

对象 大小(字节) 描述
catalog lenenc catalog, 固定为def
schema lenenc 数据库名
table lenenc 虚拟表名
org_table lenenc 实际表名
name lenenc 虚拟列名
org_name lenenc 实际列名
length of fixed length fields 1 固定为0x0c
character_set 2 字符集(collate)
column_length 4 这个字段的最大长度
type 1 字段类型
flags 2 一些flag, 比如是否不为空等
decimals 1 decimal的小数部分.
xx 2 填充,固定0x00*2

虚拟表/列名实际上就是 SQL语句中的as NAME里的name, 也是展示时显示的名字. 比如select id as colname from t1 as tblname; 则: table=tblname org_table=t1 name=colname org_name=id

character_set(字符集)在我们连接的时候讲过, 那时还得拼接一下, 现在直接给2字节了.

type 表示字段的类型, 这个类型是mysql层的, 不是存储引擎层. 具体如下(这里以5.7.38为主的, 老版本可能还有其它值)

1 : tinyint 2 : smallint 3 : int 4 : float 5 : double 7 : timestamp 8 : bigint 9 : mediumint 10 : date 11 : time 12 : datetime 13 : year 14 : date 15 : varchar 16 : bit 17 : timestamp 18 : datetime 19 : time 245 : json 246 : decimal 247 : enum 248 : set 249 : tinyblob 250 : mediumblob 251 : longblob 252 : blob 253 : varchar 254 : char 255 : geometry

比如上面我们解析出来user的类型为254, 则表示user为char类型.

flags 是一些元数据信息, 比如能否为空之类的. 详情如下:

NOT_NULL_FLAG 1 字段不能为空 PRI_KEY_FLAG 2 字段是主键(或者主键的一部分) UNIQUE_KEY_FLAG 4 字段是唯一索引(或者唯一索引的一部分) MULTIPLE_KEY_FLAG 8 字段是索引的一部分 BLOB_FLAG 16 字段是blob UNSIGNED_FLAG 32 字段无符号 ZEROFILL_FLAG 64 字段使用0填充 BINARY_FLAG 128 字段是binary类型 ENUM_FLAG 256 字段是枚举类型 AUTO_INCREMENT_FLAG 512 字段是自增类型 TIMESTAMP_FLAG 1024 字段是时间戳类型 SET_FLAG 2048 字段是set类型 NO_DEFAULT_VALUE_FLAG 4096 字段没有默认值 ON_UPDATE_NOW_FLAG 8192 字段更新的时候设置为now NUM_FLAG 32768 字段是num(for clients) PART_KEY_FLAG 16384 字段是某个索引的一部分(Intern) GROUP_FLAG 32768 UNIQUE_FLAG 65536

来看个例子b'+\x00\x00\x03\x03def\x05mysql\x04user\x04user\x04host\x04Host\x0c!\x00\xb4\x00\x00\x00\xfe\x83@\x00\x00\x00'

根据上面的格式, 我们就得到:

catalog def schema mysql table user org_table user name host org_name Host -- 没想到吧, 实际上mysql.user中host的第一个字母是大写, (User字段也是) character_set 33 #show collation where id=33; 得到是utf8_general_ci column_length 180 # 60*3 type 254 # char flags 16515 # 0,1 bit是1, 说明是主键的一部分,且不能为空. decimal 0 # 不是decimal

扯远了, 比如我们匹配到服务端给客户端返回数据行了, 就匹配下字段信息, 如果匹配上了, 后续返回数据的时候就对其进行脱敏, 比如: 字符串类型就只取首位2个字符.

演示

我们还是直接演示吧, 脚本就使用之前的流量镜像脚本稍微修改一下, 脚本见文末.

我们这里实现比较简单, 没有记录数据类型之类的脱敏可能需要的信息.

准备数据:

create table db1.t20250408(id int, name varchar(200),phone char(11)); insert into db1.t20250408 values(1,'ddcw','12345678901'); insert into db1.t20250408 values(2,'ddcw2','12345678902'); insert into db1.t20250408 values(3,'ddcw3','12345678903'); insert into db1.t20250408 values(4,'ddcw4','12345678904');

启动我们的中间件

python3 mysql_datamasking.py

image.png

登录我们自制的中间件,并查询数据
image.png

哈哈成功了, 证明此方案可行.

总结

实际使用还是推荐使用现成的工具,哪怕麻烦点,哪怕花点money.求的就是一个稳定.

但对于测试环境, 或者复杂的需求, 我们就可以自行编写中间件来实现了. 比如我只想对某项连接做脱敏,账号还是使用以前的账号, 而且如果很多表都有某个字段,还都要脱敏的话, 建视图可能会比较麻烦. 这时我们就可以使用自己写的中间件来实现了.

参考:
https://docs.percona.com/percona-server/8.0/install-data-masking-plugin.html
https://cloud.tencent.com/developer/article/2245416

附脚本

本脚本只做了理论上的实现,未做严格验证,使用的时候需要注意下.

import struct from threading import Thread from multiprocessing import Process import socket import time import sys import ssl def btoint(bdata,t='little'): return int.from_bytes(bdata,t) def read_pack(rf): pack_header = rf.read(4) if len(pack_header) < 4: print(pack_header,' bye!') sys.exit(2) btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header) pack_size = btrl + (btrh << 16) bdata = rf.read(pack_size) return pack_header+bdata def _lenenc_int_unpack(data): i = struct.unpack('<B',data[:1])[0] if i == 0xfc: return 3,struct.unpack('<H',data[1:3])[0] elif i == 0xfd: return 4,struct.unpack('<I',data[1:3]+b'0x00')[0] elif i == 0xfe: return 9,struct.unpack('<Q',data[1:9])[0] elif i < 0xfc: return 1, i else: return -1,-1 def _lenenc_int_pack(i): if i < 0xFB: return bytes([i]) elif i < (1 << 16): return b"\xfc" + struct.pack("<H", i) elif i < (1 << 24): return b"\xfd" + struct.pack("<I", i)[:3] elif i < (1 << 64): return b"\xfe" + struct.pack("<Q", i) else: return 0x00 # 匹配哪些字段需要脱敏, 比如我们只用看是否有phone字段 def match_column(data): # 懒得解析ColumnDefinition41 了,直接find吧... return True if data.find(b'phone') > 0 else False # 开始脱敏 def datamask_01(bdata,col_mask,column_count): data = [] offset = 4 for i in range(column_count): x,size = _lenenc_int_unpack(bdata[offset:][:4]) offset += x tdata = bdata[offset:offset+size] if col_mask & (1<<i) > 0: # 匹配上了,那就脱敏吧.. tdata = tdata[:2]+b"****"+tdata[-2:] data.append(tdata) offset += size # 重新封包 rdata = b''.join([ _lenenc_int_pack(len(x))+x for x in data]) rdata = struct.pack('<L',len(rdata))[:3] + bdata[3:4] + rdata return rdata class mmonitor(object): def __init__(self): self.host = '0.0.0.0' self.port = 3306 self.server = ('192.168.101.21',3314,) self.cert = '/data/mysql_3314/mysqldata/server-cert.pem' self.key = '/data/mysql_3314/mysqldata/server-key.pem' def handler_msg(self,rf,sock,f): while True: bdata = read_pack(rf) sock.sendall(bdata) #print(f'{f}',btoint(bdata[3:4]),bdata) def handler_msg_toclient(self,rf,sock,f): while True: bdata = read_pack(rf) print(bdata) if bdata[:4] == b'\x01\x00\x00\x01' and len(bdata) <= 7: # column count sock.sendall(bdata) _,column_count = _lenenc_int_unpack(bdata[4:]) col_mask = 0 # 匹配字段 for i in range(column_count): bdata = read_pack(rf) sock.sendall(bdata) if match_column(bdata): col_mask |= (1<<i) # 读取字段数据并脱敏返回 while True: bdata = read_pack(rf) if bdata[4] == 0xfe and len(bdata) == 11: #EOF sock.sendall(bdata) break print(bdata,col_mask,column_count,'XXXXXXXXXXXXXXXx') bdata = datamask_01(bdata,col_mask,column_count) sock.sendall(bdata) else: sock.sendall(bdata) def handler(self,conn,addr): sock = socket.create_connection((self.server[0], self.server[1])) server_rf = sock.makefile('rb') bdata = read_pack(server_rf) conn.sendall(bdata) print('S->C: ',btoint(bdata[3:4]),bdata) client_rf = conn.makefile('rb') bdata = read_pack(client_rf) print('C->S: ',btoint(bdata[3:4]),bdata) sock.sendall(bdata) if len(bdata) < 38: #封装为SSL (32+4) #print('SSL') #封装客户端的SSL (因为相对于client, 这是server角色) context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_cert_chain(certfile=self.cert, keyfile=self.key) conn = context.wrap_socket(conn, server_side=True) client_rf = conn.makefile('rb') #封装到server的SSL sock = ssl.wrap_socket(sock) server_rf = sock.makefile('rb') t1 = Process(target=self.handler_msg,args=(client_rf,sock,'C->S: ')) t2 = Process(target=self.handler_msg_toclient,args=(server_rf,conn,'S->C: ')) t1.start() t2.start() t1.join() t2.join() def init(self): socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_server.bind((self.host, self.port)) socket_server.listen(12345) #设置连接数 self.socket_server = socket_server accept_client_thread = Thread(target=self.accept_client,) accept_client_thread.start() accept_client_thread.join() def accept_client(self,): while True: conn, addr = self.socket_server.accept() p = Process(target=self.handler,args=(conn,addr),) p.start() aa = mmonitor() aa.init()
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论