暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MYSQL 协议 (3) 制作读写分离中间件

如果你看了前两章(连接协议解析,执行查询解析)的话, 而你又有点编程基础的话, 你应该就能制作一个简单的读写分离中间件了.

恰好我都会点点, 那就制作一个简单的读写分离中间件吧.

原理

由于还不会lex(你可以使用sqlparse), 所以就在sql里面添加hint来提示我们中间件 这条sql需要读分离. 暂不考虑事务情况. 就假设主从数据是完全一致的

大概流程如下, 就是自己监控一个端口, 当client连接上来的时候, 默认转发给 MYSQL RW(可读可写), 如果匹配到关键字, 比如:/ddcw_read/后, 就转发到MYSQL RO (只读,一般为从库)
image.png

设计过程

初始化服务

每个连接一个线程, 该线程再分出去两个线程, 一个监控client发来的数据, 另一个监控MYSQL RW发来的数据, 然后根据条件做转发.

完整代码见文末, 这只给伪代码演示

#MAIN
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
accept_client_thread = Thread(target=self.accept_client,daemon=True)
accept_client_thread.start()
accept_client_thread.join()

#accept_client
conn, addr = self.socket_server.accept()
thread = Thread(target=self.handler,args=(conn,addr),daemon=True)
thread.start()

#handler
sock = socket.create_connection((self.w[0], self.w[1])) #连接后端 MYSQL RW
t1 = Thread(target=self.hashread,args=(conn,sock)) #监控client发来的包
t2 = Thread(target=self.hashread,args=(sock,conn,)) #监控mysql rw发来的包

转发数据

判断数据包类型和是否包含关键字符串

bdata.find(b'/*ddcw_read*/') #找到需要读均衡的包

mid = hash(time.time())%self.length
ts = self.ri[mid]
ts[0].sendall(bdata) #讲该包转到随机的后台MYSQL RO

data[4:5] == b'\xfe' #判断query包结束

有个小插曲, 我解析server发来的包, 和上一篇解析的完全一样, 但是转发到client就不行, client收到empty set, 然后断开连接了.

后来对比发现, 正常的包差一个EOF开头, EOF结尾多了两空白字符, 我也修改为这样后就可以了(其实就是客户端设的CLIENT_DEPRECATE_EOF ,就是使用OK代替EOF)…
image.png

测试

由于还是测试版本, 没得接口, 直接修改源码就是了

self.host 绑定的IP地址
self.port 绑定的端口
self.w 读写  MSYQL RW
self.r 仅读, MYSQL RO

image.png
由于使用了hint, 所以使用mysql命令的时候要加个 -c 或者 --comments

暂不支持ssl 所以也要 --skip-ssl (下一篇应该就是讲SSL了)

查询下server id, 发现每次查询(hash(time.time()))的不一样, 说明读分离成功了.
image.png

建一张表, 然后插入两条数据, 然后去从库删掉一掉数据, 再使用/ddcw_read/查询
image.png
image.png
image.png
发现数据是在 1条 和2条之间切换, 说明读写分离成功了的.

总结

mysql的读写分离中间件还是比较多的, 不过都并不是那么好用, 比如官方的mysql-router, 要使用端口来区分. 就不是那么滴方便. 自己写,虽然也能实现简单的读写分离, 但是功能差得太多.

不过写着玩还是不错的, 能增长见识. (会lex后就能做分布式了)

附源码

testpymysql.py 见上一章文末或者github

本次实验的 mysql_rw.py 见github 或者如下

就是根据上上章的mysql_joker改编的

import struct from threading import Thread import socket import time import testpymysql def btoint(bdata,t='little'): return int.from_bytes(bdata,t) def read_pack(rf): pack_header = rf.read(4) if len(pack_header) < 4: print(pack_header,' bye!') exit(2) btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header) pack_size = btrl + (btrh << 16) bdata = rf.read(pack_size) if bdata.find(b'/*ddcw_read*/') == -1: return pack_header+bdata,False else: return pack_header+bdata,True class mrw(object): def __init__(self): self.host = '0.0.0.0' self.port = 3306 self.w = ('192.168.101.21',3308,) self.r = (('192.168.101.21',3308,'root','123456'), ('192.168.101.19',3306,'root','123456')) self.length = len(self.r) self.ri = [] for x in range(self.length): aa = testpymysql.mysql() aa.host = self.r[x][0] aa.port = self.r[x][1] aa.user = self.r[x][2] aa.password = self.r[x][3] aa.connect() self.ri.append([aa.sock,aa.rf]) def hashread(self,client_sock,server_sock,): rf = client_sock.makefile('rb') while True: bdata,status = read_pack(rf) #print('seq:',btoint(bdata[3:4]),bdata[4:5],bdata) if status: mid = hash(time.time())%self.length ts = self.ri[mid] ts[0].sendall(bdata) eof = 0 tdata = b'' seq = 1 while eof <2: data,status = read_pack(ts[1]) if data[4:5] == b'\xfe': eof += 1 if eof == 1: continue data = bytearray(data) data[3:4] = struct.pack('<B',seq) if eof == 2: data[0:3] = b'\x07\x00\x00' data += b'\x00\x00' #print('seq:',btoint(data[3:4]),data[4:5],data) client_sock.sendall(data) seq += 1 else: server_sock.sendall(bdata) def handler(self,conn,addr): #连接SERVER sock = socket.create_connection((self.w[0], self.w[1])) sock.settimeout(None) #1个监控conn发来的数据,然后转发, 一个监控server发来的数据, 然后转发 t1 = Thread(target=self.hashread,args=(conn,sock)) t2 = Thread(target=self.hashread,args=(sock,conn,)) t1.start() t2.start() t1.join() t2.join() def init(self): socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_server.bind((self.host, self.port)) socket_server.listen(151) self.socket_server = socket_server accept_client_thread = Thread(target=self.accept_client,daemon=True) accept_client_thread.start() accept_client_thread.join() def accept_client(self,): while True: conn, addr = self.socket_server.accept() thread = Thread(target=self.handler,args=(conn,addr),daemon=True) thread.start() aa = mrw() aa.init()
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论