暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

MYSQL 协议 (7) 主从协议 并使用PYTHON模拟

原创 大大刺猬 2023-04-23
944

MYSQL的主从应该是使用得最多的架构, 使用也很简单, 就change master to xxx 然后start 就可以了, 但是你知道原理吗?

写在前面

如果你阅读了我之前写的那个 MYSQL流量镜像, 那么你就可以看到mysql连接过程如下:

就是连接mysql之后, 设置了一些变量, 然后发送了两个特殊的包. 是不是很简单. 下面我们就来详细介绍下

ps: 其实这个流量镜像脚本还可以用来当general log使用(仅部分连接的流量日志) -_-
image.png
image.png

主从连接过程

连接上mysql服务器后, 都是request_dump(sql/rpl_slave.cc)函数通知主库我是从库.

主从连接, 分两种情况, 一种是基于gtid(MASTER_AUTO_POSITION = 1)的, 另一种是指定log_filename,log_pos的, 推荐使用第一种, 但第二种更简单. 本文两种都演示.

先设置一些参数, 比如master_heartbeat_period(心跳间隔,单位纳秒),master_binlog_checksum,slave_uuid

同时也会查询一些服务器信息, 比如SERVER_ID等. 但本次环境就不整那么多了.

client ->>  server : set header_beat等
server ->>  client : ok
client -->>  server : COM_REGISTER_SLAVE
server -->>  client : ok
client ->>  server : COM_BINLOG_DUMP/COM_BINLOG_DUMP_GTID
server -->> client: EVENT(含Heartbeat_event)

下图 省略连接过程,(也不考虑失败情况) 要看账号认证过程, 请看之前的文章: mysql连接协议解析
image.png

主从相关包结构

主要就是COM_BINLOG_DUMP,COM_BINLOG_DUMP_GTID,COM_REGISTER_SLAVE

Heartbeat_event 比较简单, 就是个event_header加个binlog_filename 就没了…

COM_BINLOG_DUMP

这个包用于非gtid模式, 告诉服务器, 我是dump线程, 结构如下:

源码在:(sql/rpl_slave.cc)request_dump

image.png

COM_BINLOG_DUMP_GTID

用于gtid模式下,结构如下:

源码在:(sql/rpl_slave.cc)request_dump

image.png

COM_REGISTER_SLAVE

注册主从的(可选). 多数字段都是0填充的… 不要也罢 -_-

源码在: sql/rpl_slave.cc register_slave_on_master
image.png

PYTHON模拟

本次模拟就不写relay log了, print出来就行, event也不全解析了, 就解析下header就行

gtid

就是 MASTER_AUTO_POSITION = 1 的情况

import testpymysqlreplication
aa = testpymysqlreplication.repl()
aa.server_id = 123456
aa.auto_position = True
aa.connect()
aa.request_dump()
for x in aa.event():
	print(x)

image.png
image.png

再来插入条数据瞧瞧

也是没得问题的

image.png
image.png
event_type为27的就是 心跳包

非GTID

使用和gtid差不多, 只是auto_position设置为False

import testpymysqlreplication
aa = testpymysqlreplication.repl()
aa.server_id = 123456
aa.log_file = 'm3308.001037'
aa.log_pos = 4
aa.auto_position = False
aa.connect()
aa.request_dump()
for x in aa.event():
	print(x)

image.png

总结

  1. mysql主从分两种情况, 有gtid和无gtid情况, 分别对应MASTER_AUTO_POSITION = 1和MASTER_AUTO_POSITION = 0

  2. 执行完request_dump后, 从库就只需要等主库发送就行了

  3. 如果从库长时间未收到主库的心跳包, 就认为主库挂了. 如果24小时业务都频繁的话, 可以不要心跳包(其实30秒一个流量也不多…)

注:每个command都要重置seq为0

PYTHON源码

本次源码没有解析GTID, 使用的mysql_monitor脚本得到的gtid信息. 如果要解析gtid的话, 可以参考pymysqlreplication的gtid.py 也可以参考官方文档

testpymysql 脚本是之前解析mysql连接的时候的, 也可以使用pymysql的

import testpymysql import struct def _read_lenenc(bdata,i): length = btoint(bdata[i:i+1]) i += 1 data = bdata[i:i+length] i += length return data,i def event_header(bdata): timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("<LBLLLh",bdata[0:19]) return {"timestamp":timestamp,'event_type':event_type,'server_id':server_id,'event_size':event_size,'log_pos':log_pos,'flags':flags,} class repl(testpymysql.mysql): def __init__(self,*args,**kwargs): super().__init__(**kwargs) self.server_id = 12345678 #uint32 self.lport = 6666 self.auto_position = False #Ture:gtid self.master_heartbeat_period = 30*1000000000 #设置心跳, 单位纳秒 self.log_file = 'm3308.001037' #其实可以show master status的, 但是我懒得去查了... self.log_pos = 4 #懒得去计算gtid了, 直接用我的环境的现成的, 计算方式可以参考: pymysqlreplication 的 gtid.py #使用: gtid.GtidSet(GTID_STR).encode() self.bgtid = b'\x06\x00\x00\x00\x00\x00\x00\x00N\x08\x1c\xcb\xb0\xc9\x11\xed\xbf\x10\x00\x0c)\x938\xcc\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00me\x0f\x1f\xbaN\x11\xed\x99\xab\x00\x0c)\x80\xc1\x1e\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00Eg\x02\x00\x00\x00\x00\x00z\xb0f\xef\xc1\xbe\x11\xec\x92\xdd\x00\x0c)\x80\xc1\x1e\x03\x00\x00\x00\x00\x00\x00\x00\x13\n\x00\x00\x00\x00\x00\x00\x19\n\x00\x00\x00\x00\x00\x00\x8c\n\x00\x00\x00\x00\x00\x00\x8d\n\x00\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\x90\xbd\xfb\xb7\xcb\xe2\x11\xec\xa8p\x00\x0c)\x80\xc1\x12\x01\x00\x00\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\x90\xbd\xfb\xb7\xcb\xe2\x11\xec\xa8p\x00\x0c)\x80\xc1\x1e\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\xa9\xbb\xd7\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\xf7R\x04\x00\x00\x00\x00\x00' #sql/rpl_slave.cc register_slave_on_master def register_slave(self): """ COM_FLAG: 1 (COM_REGISTER_SLAVE:21) server_id: 4 host,user,password 0 port: 2 rpl_recovery_rank:4 (0) master_id:4 (0) """ bdata = struct.pack('<BLBBBHLL',21,self.server_id,0,0,0,self.lport,0,0) self._next_seq_id = 0 #每个com都清零 self.write_pack(bdata) rpack = self.read_pack() return True if rpack[0:1] == b'\x00' else False #sql/rpl_slave.cc request_dump def request_dump(self): self.query(f'SET @master_heartbeat_period = {self.master_heartbeat_period}') #master_binlog_checksum, slave_uuid 算了 bdata = b'' if self.auto_position: """ COM_FLAG: 1 (COM_BINLOG_DUMP_GTID:30) binlog_flags: 2 server_id: 4 BINLOG_NAME_INFO_SIZE: 4 BINLOG_NAME: POS: 8 gtid_size: 4 gtid: """ #regiest first bdata = struct.pack('<BHLLLQL',30,0,self.server_id,4,0,self.log_pos,len(self.bgtid)) + self.bgtid self.register_slave() else: """ COM_FLAG: 1 (COM_BINLOG_DUMP:18) binlog-pos: 4 flags: 2 server_id: 4 binlog-filename: """ self.register_slave() bdata = struct.pack('<BLHL',18,self.log_pos,0,self.server_id) + self.log_file.encode() self._next_seq_id = 0 self.write_pack(bdata) def event(self): while True: bdata = self.read_pack() if bdata[0:1] == b'\x00' and len(bdata) > 20: yield event_header(bdata[1:20]) def query(self,sql): self._next_seq_id = 0 bdata = struct.pack('<B',3) + sql.encode() self.write_pack(bdata) #return self.result() def close(self): #self._next_seq_id = 0 self.write_pack(struct.pack('<B',1))
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论