需求
昨天群友有这么一个需求: 对应特定的连接查询特定的表字段的时候,对其进行脱敏. 比如: select name, phone from tblname where id=xxx 查询出来的phone需要是脱敏的.即显示为:152****6666这种样子.

需求分析
对于数据脱敏, 我们可以在数据库层做, 也可以在返回数据的时候动手脚.
- 比较快速的是使用视图, 在建视图的时候,就对数据进行脱敏, 然后只给对应连接该视图的权限. 脱敏函数可以参考percona的插件
data_masking.so的. - 使用第三方的脱敏工具, 但是可能使用较为复杂.
- 自定义:做一个中间件转发业务sql,如果返回数据匹配到相关相关字段则进行脱敏
实现方法
前两者都有现成的方法. 我们主要看自定义的那一种.

那么具体该怎么做呢?
客户端发送的那一条线, 完全是转发, 不需要额外处理.
主要是服务端返回给客户端的时候需要注意下, 主要涉及3种包:
- column_count 记录数据行数的
- column 记录字段元数据信息的, 比如字段名字,字段类型等
- row 具体的数据行.
- EOF 数据返回完成.

column_count比较简单, 只有size(存储字段列数需要几个字节,)和字段列数
row 是每行数据一个包, 格式是所有字段放一堆, 使用大小+值的格式.
EOF 大小位置是0xfe,后面跟的实际上只有warnings和status_flags. 如下:

主要是column稍微复杂一丢丢(其实我们之前也讲过),我们这里以ColumnDefinition41为例, 结构如下:
字符对象往往由 长度+值构成, 比如 x03def
| 对象 | 大小(字节) | 描述 |
|---|---|---|
| catalog | lenenc | catalog, 固定为def |
| schema | lenenc | 数据库名 |
| table | lenenc | 虚拟表名 |
| org_table | lenenc | 实际表名 |
| name | lenenc | 虚拟列名 |
| org_name | lenenc | 实际列名 |
| length of fixed length fields | 1 | 固定为0x0c |
| character_set | 2 | 字符集(collate) |
| column_length | 4 | 这个字段的最大长度 |
| type | 1 | 字段类型 |
| flags | 2 | 一些flag, 比如是否不为空等 |
| decimals | 1 | decimal的小数部分. |
| xx | 2 | 填充,固定0x00*2 |
虚拟表/列名实际上就是 SQL语句中的as NAME里的name, 也是展示时显示的名字. 比如select id as colname from t1 as tblname; 则: table=tblname org_table=t1 name=colname org_name=id
character_set(字符集)在我们连接的时候讲过, 那时还得拼接一下, 现在直接给2字节了.
type 表示字段的类型, 这个类型是mysql层的, 不是存储引擎层. 具体如下(这里以5.7.38为主的, 老版本可能还有其它值)
1 : tinyint 2 : smallint 3 : int 4 : float 5 : double 7 : timestamp 8 : bigint 9 : mediumint 10 : date 11 : time 12 : datetime 13 : year 14 : date 15 : varchar 16 : bit 17 : timestamp 18 : datetime 19 : time 245 : json 246 : decimal 247 : enum 248 : set 249 : tinyblob 250 : mediumblob 251 : longblob 252 : blob 253 : varchar 254 : char 255 : geometry
比如上面我们解析出来user的类型为254, 则表示user为char类型.
flags 是一些元数据信息, 比如能否为空之类的. 详情如下:
NOT_NULL_FLAG 1 字段不能为空 PRI_KEY_FLAG 2 字段是主键(或者主键的一部分) UNIQUE_KEY_FLAG 4 字段是唯一索引(或者唯一索引的一部分) MULTIPLE_KEY_FLAG 8 字段是索引的一部分 BLOB_FLAG 16 字段是blob UNSIGNED_FLAG 32 字段无符号 ZEROFILL_FLAG 64 字段使用0填充 BINARY_FLAG 128 字段是binary类型 ENUM_FLAG 256 字段是枚举类型 AUTO_INCREMENT_FLAG 512 字段是自增类型 TIMESTAMP_FLAG 1024 字段是时间戳类型 SET_FLAG 2048 字段是set类型 NO_DEFAULT_VALUE_FLAG 4096 字段没有默认值 ON_UPDATE_NOW_FLAG 8192 字段更新的时候设置为now NUM_FLAG 32768 字段是num(for clients) PART_KEY_FLAG 16384 字段是某个索引的一部分(Intern) GROUP_FLAG 32768 UNIQUE_FLAG 65536
来看个例子b'+\x00\x00\x03\x03def\x05mysql\x04user\x04user\x04host\x04Host\x0c!\x00\xb4\x00\x00\x00\xfe\x83@\x00\x00\x00'
根据上面的格式, 我们就得到:
catalog def schema mysql table user org_table user name host org_name Host -- 没想到吧, 实际上mysql.user中host的第一个字母是大写, (User字段也是) character_set 33 #show collation where id=33; 得到是utf8_general_ci column_length 180 # 60*3 type 254 # char flags 16515 # 0,1 bit是1, 说明是主键的一部分,且不能为空. decimal 0 # 不是decimal
扯远了, 比如我们匹配到服务端给客户端返回数据行了, 就匹配下字段信息, 如果匹配上了, 后续返回数据的时候就对其进行脱敏, 比如: 字符串类型就只取首位2个字符.
演示
我们还是直接演示吧, 脚本就使用之前的流量镜像脚本稍微修改一下, 脚本见文末.
我们这里实现比较简单, 没有记录数据类型之类的脱敏可能需要的信息.
准备数据:
create table db1.t20250408(id int, name varchar(200),phone char(11));
insert into db1.t20250408 values(1,'ddcw','12345678901');
insert into db1.t20250408 values(2,'ddcw2','12345678902');
insert into db1.t20250408 values(3,'ddcw3','12345678903');
insert into db1.t20250408 values(4,'ddcw4','12345678904');
启动我们的中间件
python3 mysql_datamasking.py

登录我们自制的中间件,并查询数据

哈哈成功了, 证明此方案可行.
总结
实际使用还是推荐使用现成的工具,哪怕麻烦点,哪怕花点money.求的就是一个稳定.
但对于测试环境, 或者复杂的需求, 我们就可以自行编写中间件来实现了. 比如我只想对某项连接做脱敏,账号还是使用以前的账号, 而且如果很多表都有某个字段,还都要脱敏的话, 建视图可能会比较麻烦. 这时我们就可以使用自己写的中间件来实现了.
参考:
https://docs.percona.com/percona-server/8.0/install-data-masking-plugin.html
https://cloud.tencent.com/developer/article/2245416
附脚本
本脚本只做了理论上的实现,未做严格验证,使用的时候需要注意下.
import struct
from threading import Thread
from multiprocessing import Process
import socket
import time
import sys
import ssl
def btoint(bdata,t='little'):
return int.from_bytes(bdata,t)
def read_pack(rf):
pack_header = rf.read(4)
if len(pack_header) < 4:
print(pack_header,' bye!')
sys.exit(2)
btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header)
pack_size = btrl + (btrh << 16)
bdata = rf.read(pack_size)
return pack_header+bdata
def _lenenc_int_unpack(data):
i = struct.unpack('<B',data[:1])[0]
if i == 0xfc:
return 3,struct.unpack('<H',data[1:3])[0]
elif i == 0xfd:
return 4,struct.unpack('<I',data[1:3]+b'0x00')[0]
elif i == 0xfe:
return 9,struct.unpack('<Q',data[1:9])[0]
elif i < 0xfc:
return 1, i
else:
return -1,-1
def _lenenc_int_pack(i):
if i < 0xFB:
return bytes([i])
elif i < (1 << 16):
return b"\xfc" + struct.pack("<H", i)
elif i < (1 << 24):
return b"\xfd" + struct.pack("<I", i)[:3]
elif i < (1 << 64):
return b"\xfe" + struct.pack("<Q", i)
else:
return 0x00
# 匹配哪些字段需要脱敏, 比如我们只用看是否有phone字段
def match_column(data):
# 懒得解析ColumnDefinition41 了,直接find吧...
return True if data.find(b'phone') > 0 else False
# 开始脱敏
def datamask_01(bdata,col_mask,column_count):
data = []
offset = 4
for i in range(column_count):
x,size = _lenenc_int_unpack(bdata[offset:][:4])
offset += x
tdata = bdata[offset:offset+size]
if col_mask & (1<<i) > 0: # 匹配上了,那就脱敏吧..
tdata = tdata[:2]+b"****"+tdata[-2:]
data.append(tdata)
offset += size
# 重新封包
rdata = b''.join([ _lenenc_int_pack(len(x))+x for x in data])
rdata = struct.pack('<L',len(rdata))[:3] + bdata[3:4] + rdata
return rdata
class mmonitor(object):
def __init__(self):
self.host = '0.0.0.0'
self.port = 3306
self.server = ('192.168.101.21',3314,)
self.cert = '/data/mysql_3314/mysqldata/server-cert.pem'
self.key = '/data/mysql_3314/mysqldata/server-key.pem'
def handler_msg(self,rf,sock,f):
while True:
bdata = read_pack(rf)
sock.sendall(bdata)
#print(f'{f}',btoint(bdata[3:4]),bdata)
def handler_msg_toclient(self,rf,sock,f):
while True:
bdata = read_pack(rf)
print(bdata)
if bdata[:4] == b'\x01\x00\x00\x01' and len(bdata) <= 7: # column count
sock.sendall(bdata)
_,column_count = _lenenc_int_unpack(bdata[4:])
col_mask = 0
# 匹配字段
for i in range(column_count):
bdata = read_pack(rf)
sock.sendall(bdata)
if match_column(bdata):
col_mask |= (1<<i)
# 读取字段数据并脱敏返回
while True:
bdata = read_pack(rf)
if bdata[4] == 0xfe and len(bdata) == 11: #EOF
sock.sendall(bdata)
break
print(bdata,col_mask,column_count,'XXXXXXXXXXXXXXXx')
bdata = datamask_01(bdata,col_mask,column_count)
sock.sendall(bdata)
else:
sock.sendall(bdata)
def handler(self,conn,addr):
sock = socket.create_connection((self.server[0], self.server[1]))
server_rf = sock.makefile('rb')
bdata = read_pack(server_rf)
conn.sendall(bdata)
print('S->C: ',btoint(bdata[3:4]),bdata)
client_rf = conn.makefile('rb')
bdata = read_pack(client_rf)
print('C->S: ',btoint(bdata[3:4]),bdata)
sock.sendall(bdata)
if len(bdata) < 38: #封装为SSL (32+4)
#print('SSL')
#封装客户端的SSL (因为相对于client, 这是server角色)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=self.cert, keyfile=self.key)
conn = context.wrap_socket(conn, server_side=True)
client_rf = conn.makefile('rb')
#封装到server的SSL
sock = ssl.wrap_socket(sock)
server_rf = sock.makefile('rb')
t1 = Process(target=self.handler_msg,args=(client_rf,sock,'C->S: '))
t2 = Process(target=self.handler_msg_toclient,args=(server_rf,conn,'S->C: '))
t1.start()
t2.start()
t1.join()
t2.join()
def init(self):
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_server.bind((self.host, self.port))
socket_server.listen(12345) #设置连接数
self.socket_server = socket_server
accept_client_thread = Thread(target=self.accept_client,)
accept_client_thread.start()
accept_client_thread.join()
def accept_client(self,):
while True:
conn, addr = self.socket_server.accept()
p = Process(target=self.handler,args=(conn,addr),)
p.start()
aa = mmonitor()
aa.init()




