暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MYSQL 协议 (2) 收发数据(COM_QUERY)

如果你看了 上一章 , 那你应该就明白了Mysql连接的时候都干了啥, 但是光连上也没啥用啊, 要发送SQL,接收server发来的数据.
本文主要就讲mysql客户端服务端发送数据过程(仅COM_QUERY)

发送SQL(COM_QUERY)

发送sql比较简单, 直接就是 包头加sql就行了…
image.png

bdata = struct.pack('<IB',len(sql)+1,0x03) #I:每个com_query的seq_id都从0开始,第4字节固定为0, 所以直接用I, +1:com_query占>用1字节, 0x03:com_query bdata += sql.encode() self.sock.sendall(bdata) #直接send就行 self._next_seq_id = 1 #下一个包seq_id = 1

接收数据

当mysql执行完SQL后, 就会返回相关的数据

流程

完整流程如下, 本次环境不考虑特殊情况

不考虑0xFF(error) 0xFB(字段太多) 0x00(无返回数据,就是成功)
image.png

所以实际上的过程就是如下

Client ->> Server : SQL
Server ->> Client : 字段数量
Server -->> Client : 具体的字段(每个字段一个包)
Server ->> Client : EOF(warnings)
Server -->> Client : 数据(每行数据一个包)
Server ->> Client : EOF(同上)

image.png
image.png

下面具体讲讲, 字段数量包就一个字节,没啥好说的, 本文也没有考虑字段多的情况

字段包

本文使用的ColumnDefinition41, 格式如下
image.png

EOF包

两个EOF包完全一样, 只是方便识别数据界限而已. 毕竟没有返回行数… 所以要EOF包来区分(也可能是使用OK包, 这取决于客户端的capabilities)

Payload固定5字节

header + Payload = mysql_pack

(header: 3字节大小 加 1字节seq_id)

image.png

数据行包

这个和binlog一样…

都是长度加数据, 然后放一堆, 长度取决于数据字段类型, 字段类型来自上面的字段包

PYTHON模拟

模拟客户端发送数据, 并解析server返回的数据

脚本见文末, 或者 https://github.com/ddcw/ddcw/blob/master/python/testpymysql.py

import testpymysql aa = testpymysql.mysql() aa.connect() aa.query('select aa.id as sb,aa.name from db1.t1 as aa limit 4') for x in aa.result(): print(x) print(aa.des_list)

image.png

总结

  1. 客户端发送SQL很简单, 直接把com_query+SQL发送到服务器上就行

  2. 服务器返回数据过程: 字段数量, 字段, EOF, 行… EOF

  3. 返回的数据行和binlog存储的是一样的, 都是长度+数据放一堆

  4. server返回的数据行数是由客户端统计的

5.默认不返回warning, 需要自己使用show warnings去获取

附源码

import hashlib import socket import struct import os #来自pymysql def _lenenc_int(i): if i < 0: raise ValueError("Encoding %d is less than 0 - no representation in LengthEncodedInteger" % i) elif i < 0xFB: return bytes([i]) elif i < (1 << 16): return b"\xfc" + struct.pack("<H", i) elif i < (1 << 24): return b"\xfd" + struct.pack("<I", i)[:3] elif i < (1 << 64): return b"\xfe" + struct.pack("<Q", i) else: raise ValueError("Encoding %x is larger than %x - no representation in LengthEncodedInteger"% (i, (1 << 64))) def native_password(password,salt): stage1 = hashlib.sha1(password).digest() stage2 = hashlib.sha1(stage1).digest() rp = hashlib.sha1(salt) rp.update(stage2) result = bytearray(rp.digest()) for x in range(len(result)): result[x] ^= stage1[x] return result def _read_lenenc(bdata,i): length = btoint(bdata[i:i+1]) i += 1 data = bdata[i:i+length] i += length return data,i def btoint(bdata,t='little'): return int.from_bytes(bdata,t) class mysql(object): def __init__(self): self.host = '192.168.101.21' self.port = 3308 self.user = 'root' self.password = '123456' def read_pack(self,): pack_header = self.rf.read(4) btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header) pack_size = btrl + (btrh << 16) self._next_seq_id = (self._next_seq_id + 1) % 256 bdata = self.rf.read(pack_size) #也懒得考虑超过16MB的包了 return bdata def write_pack(self,data): #3字节长度, 1字节seq, data bdata = struct.pack("<I", len(data))[:3] + bytes([self._next_seq_id]) + data self.sock.sendall(bdata) self._next_seq_id = (self._next_seq_id + 1) % 256 def handshake(self,bdata): i = 0 #已经读取的字节数, 解析binlog的时候也是这么用的..... protocol_version = bdata[:1] #只解析10 server_end = bdata.find(b"\0", i) self.server_version = bdata[i:server_end] i = server_end + 1 self.thread_id = btoint(bdata[i:i+4]) i += 4 self.salt = bdata[i:i+8] i += 9 #还有1字节的filter, 没啥意义,就不保存了 self.server_capabilities = btoint(bdata[i:i+2]) i += 2 self.server_charset = btoint(bdata[i:i+1]) i += 1 self.server_status = btoint(bdata[i:i+2]) i += 2 self.server_capabilities |= btoint(bdata[i:i+2]) << 16 #往左移16位 为啥不把capability_flags_1和capability_flags_2和一起呢 i += 2 salt_length = struct.unpack('<B',bdata[i:i+1])[0] #懒得去判断capabilities & CLIENT_PLUGIN_AUTH了 salt_length = max(13,salt_length-8) #前面已经有8字节了 i += 1 i += 10 #reserved self.salt += bdata[i:i+salt_length] i += salt_length self.server_plugname = bdata[i:] def HandshakeResponse41(self,): client_flag = 3842565 #不含DBname #client_flag |= 1 << 3 charset_id = 45 #45:utf8mb4 33:utf8 #bdata = client_flag.to_bytes(4,'little') #其实应该最后在加, 毕竟还要判断很多参数, 可能还需要修改, 但是懒 bdata = struct.pack('<iIB23s',client_flag,2**24-1,charset_id,b'') bdata += self.user.encode() + b'\0' auth_password = native_password(self.password.encode(), self.salt[:20]) auth_response = _lenenc_int(len(auth_password)) + auth_password bdata += auth_response bdata += b"mysql_native_password" + b'\0' #本文有设置连接属性, 主要是为了方便观察 attr = {'_client_name':'ddcw_for_pymysql', '_pid':str(os.getpid()), "_client_version":'0.0.1',} #key长度+k+v长度+v connect_attrs = b"" for k, v in attr.items(): k = k.encode() connect_attrs += _lenenc_int(len(k)) + k v = v.encode() connect_attrs += _lenenc_int(len(v)) + v bdata += _lenenc_int(len(connect_attrs)) + connect_attrs self.write_pack(bdata) auth_pack = self.read_pack() #看看是否连接成功 if auth_pack[:1] == b'\0': print('OK',) else: print('FAILED',auth_pack) def query(self,sql): """不考虑SQL超过16MB情况""" # payload_length:3 sequence_id:1 payload:N # payload: com_query(0x03):1 sql:n bdata = struct.pack('<IB',len(sql)+1,0x03) #I:每个com_query的seq_id都从0开始,第4字节固定为0, 所以直接用I, +1:com_query占用1字节, 0x03:com_query bdata += sql.encode() self.sock.sendall(bdata) self._next_seq_id = 1 #下一个包seq_id = 1 def result(self): #https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_text_resultset_column_definition.html #Protocol::ColumnDefinition41 #字段数量 stat = self.read_pack() filed_count = struct.unpack('<B',stat)[0] #不考虑0xFF(error) 0xFB(字段太多) 0x00(无返回数据,就是成功) #字段描述(字段数据类型) des_list = [] for x in range(filed_count): i = 0 bdata = self.read_pack() catalog,i = _read_lenenc(bdata,i) schema,i = _read_lenenc(bdata,i) table,i = _read_lenenc(bdata,i) org_table,i = _read_lenenc(bdata,i) name,i = _read_lenenc(bdata,i) org_name,i = _read_lenenc(bdata,i) i += 1 #0x0c character_set = btoint(bdata[i:i+2]) i += 2 column_length = btoint(bdata[i:i+4]) i += 4 _type = btoint(bdata[i:i+1]) #只解析int和str, 之前解析binlog的时候还有date.... 算了 i += 1 flags = btoint(bdata[i:i+2]) i += 2 decimals = btoint(bdata[i:i+1]) i += 1 des_list.append([catalog,schema,table,org_table,name,org_name,character_set,column_length,_type,flags,decimals]) self.des_list = des_list bdata = self.read_pack() #EOF包 warnings = btoint(bdata[1:3]) row = [] while True: bdata = self.read_pack() if bdata[0:1] == b'\xfe': #EOF包 break _row = [] i = 0 for x in des_list: length = btoint(bdata[i:i+1]) #不考虑长字符 i += 1 _row.append(bdata[i:i+length]) #懒得做数据类型转换了 row.append(_row) print(f'warnings:{warnings} rows:{len(row)}') return row def connect(self): sock = socket.create_connection((self.host, self.port)) sock.settimeout(None) self.sock = sock self.rf = sock.makefile("rb") self._next_seq_id = 0 #解析server的握手包 bdata = self.read_pack() self.handshake(bdata) #握手.发账号密码 self.HandshakeResponse41()
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论