暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

[MYSQL] 主从连接协议(2)--GTID

原创 大大刺猬 2024-05-08
597

导读

之前有讲MYSQL连接协议, 也有讲过主从连接协议. 并附有相关python测试代码. 但对于主从连接的时候, GTID获取还是借用的现有的, 也就是没有做解析. 在我们解析了binlog之后. gtid信息就不在话下了. 格式就是PRE_GTID, 我这里就不再介绍了. 有兴趣的自己去看 https://www.modb.pro/db/1781217154309378048

GTID decode&encode

每次重写之前的功能, 都想着写得简单方便实用点. 于是就是有了gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")]) 这种看起来不友好, 但又简单实用的写法了. 尽是些花里胡哨的 -_-

class GTID(object): def __init__(self,data): if isinstance(data,bytes): self.bdata = data else: self.data = data self.offset = 0 def read(self,n): data = self.bdata[self.offset:self.offset+n] self.offset += n return data def encode(self,tdata=None): data = self.data if tdata is None else tdata data = data.replace("\n","") gtid_number = 0 ldata = data.rstrip(",").split(",") gtid = struct.pack("<Q",len(ldata)) for x in ldata: gtid_number += 0 _data = x.split(":") uid = binascii.unhexlify(_data[0].replace('-', '')) gtid += uid + struct.pack('<Q',len(_data[1:])) for y in _data[1:]: gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")]) return gtid def decode(self,): # @https://github.com/ddcw/pymysqlbinlog : pymysqlbinlog/gtid_event.py self.offset = 0 gtid_number = struct.unpack('<Q',self.read(8))[0] gtid_list = [] for x in range(gtid_number): server_uid_bdata = self.read(16) if server_uid_bdata == b'': break sid = str(uuid.UUID(bytes=server_uid_bdata)) group_gno_number = struct.unpack('<Q',self.read(8))[0] gtid_info = sid for y in range(group_gno_number): start = struct.unpack('<Q',self.read(8))[0] stop = struct.unpack('<Q',self.read(8))[0] gtid_info += f":{start}-{stop}" gtid_list.append(gtid_info) return ",".join([ str(x) for x in gtid_list ])

演示

基本上只是对上一次的脚本做补充, 用法没变, 我这里就直接演示了. 直接展示又好像少了点什么, 所以我就和 pymysqlbinlog 结合起来了, 只打印DML语句信息. (基本上只是多个导包而已).

import t20240508 aa = t20240508.repl() aa.server_id = 123456 aa.auto_position = True aa.connect() aa.request_dump("6d650f1f-ba4e-11ed-99ab-000c2980c11e:1-5") # 请求相关GTID. 包含结束的那个GTID值.(就是server端有个-1操作, 解析binlog的时候代码里面也有写 -1) aa.parse_event()

image.png

附源码

我这里就只给t20240508.py的源码(大部分没变), 其它的之前都发过.

import testpymysql import struct import uuid import binascii from pymysqlbinlog.pymysqlbinlog import mysqlbinlog from pymysqlbinlog.row_event import * def _read_lenenc(bdata,i): length = btoint(bdata[i:i+1]) i += 1 data = bdata[i:i+length] i += length return data,i def event_header(bdata): timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("<LBLLLh",bdata[0:19]) return {"timestamp":timestamp,'event_type':event_type,'server_id':server_id,'event_size':event_size,'log_pos':log_pos,'flags':flags,} class GTID(object): def __init__(self,data): if isinstance(data,bytes): self.bdata = data else: self.data = data self.offset = 0 def read(self,n): data = self.bdata[self.offset:self.offset+n] self.offset += n return data def encode(self,tdata=None): data = self.data if tdata is None else tdata data = data.replace("\n","") gtid_number = 0 ldata = data.rstrip(",").split(",") gtid = struct.pack("<Q",len(ldata)) for x in ldata: gtid_number += 0 _data = x.split(":") uid = binascii.unhexlify(_data[0].replace('-', '')) gtid += uid + struct.pack('<Q',len(_data[1:])) for y in _data[1:]: gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")]) return gtid def decode(self,): # @https://github.com/ddcw/pymysqlbinlog : pymysqlbinlog/gtid_event.py self.offset = 0 gtid_number = struct.unpack('<Q',self.read(8))[0] gtid_list = [] for x in range(gtid_number): server_uid_bdata = self.read(16) if server_uid_bdata == b'': break sid = str(uuid.UUID(bytes=server_uid_bdata)) group_gno_number = struct.unpack('<Q',self.read(8))[0] gtid_info = sid for y in range(group_gno_number): start = struct.unpack('<Q',self.read(8))[0] stop = struct.unpack('<Q',self.read(8))[0] gtid_info += f":{start}-{stop}" gtid_list.append(gtid_info) return ",".join([ str(x) for x in gtid_list ]) class repl(testpymysql.mysql,mysqlbinlog): def __init__(self,*args,**kwargs): #super().__init__(*args,**kwargs) self.filename = "None" testpymysql.mysql.__init__(self,) mysqlbinlog.__init__(self,filename="xx") self.server_id = 12345678 #uint32 self.lport = 6666 self.auto_position = False #Ture:gtid self.master_heartbeat_period = 30*1000000000 #设置心跳, 单位纳秒 self.log_file = 'm3308.001037' #其实可以show master status的, 但是我懒得去查了... self.log_pos = 4 #懒得去计算gtid了, 直接用我的环境的现成的, 计算方式可以参考: pymysqlreplication 的 gtid.py #使用: gtid.GtidSet(GTID_STR).encode() self.ROLLBACK = False self.checksum = None #sql/rpl_slave.cc register_slave_on_master def register_slave(self): """ COM_FLAG: 1 (COM_REGISTER_SLAVE:21) server_id: 4 host,user,password 0 port: 2 rpl_recovery_rank:4 (0) master_id:4 (0) """ bdata = struct.pack('<BLBBBHLL',21,self.server_id,0,0,0,self.lport,0,0) self._next_seq_id = 0 #每个com都清零 self.write_pack(bdata) rpack = self.read_pack() return True if rpack[0:1] == b'\x00' else False #sql/rpl_slave.cc request_dump def request_dump(self,gtid): self.query(f'SET @master_heartbeat_period = {self.master_heartbeat_period}') #master_binlog_checksum, slave_uuid 算了 bdata = b'' if self.auto_position: """ COM_FLAG: 1 (COM_BINLOG_DUMP_GTID:30) binlog_flags: 2 server_id: 4 BINLOG_NAME_INFO_SIZE: 4 BINLOG_NAME: POS: 8 gtid_size: 4 gtid: """ #regiest first aa = GTID(gtid) self.bgtid = aa.encode() bdata = struct.pack('<BHLLLQL',30,0,self.server_id,4,0,self.log_pos,len(self.bgtid)) + self.bgtid self.register_slave() else: """ COM_FLAG: 1 (COM_BINLOG_DUMP:18) binlog-pos: 4 flags: 2 server_id: 4 binlog-filename: """ self.register_slave() bdata = struct.pack('<BLHL',18,self.log_pos,0,self.server_id) + self.log_file.encode() self._next_seq_id = 0 self.write_pack(bdata) def read_event(self,): bdata = self.read_pack() if bdata[0:1] == b'\x00' and len(bdata) > 20: return bdata[1:] def event(self): while True: bdata = self.read_pack() if bdata[0:1] == b'\x00' and len(bdata) > 20: yield event_header(bdata[1:20]) def parse_event(self): while True: bdata = self.read_pack() if bdata[0:1] == b'\x00' and len(bdata) > 20: bdata = bdata[1:] event_type = struct.unpack('>B',bdata[4:5])[0] if event_type == 19: aa = tablemap_event(bdata=bdata[19:],event_header=bdata[:19]) aa.init() bdata = self.read_pack()[1:] bb = row_event(bdata=bdata[19:],tablemap=aa,event_header=bdata[:19]) bb.init() print(bb.read_sql()) def query(self,sql): self._next_seq_id = 0 bdata = struct.pack('<B',3) + sql.encode() self.write_pack(bdata) #return self.result() def close(self): #self._next_seq_id = 0 self.write_pack(struct.pack('<B',1))

其它

写本文的时候发生的事情, 记录一下.
之前环境迁移, 今天验证的时候,发现个别文件系统的部分文件目录丢失了, 在该文件系统的lost+found目录找到了相关文件和目录, 但未发现删除之类的信息, 虽然恢复起来很麻烦. 最终还是恢复成功了. 迁移前一定要做好备份

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论