暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MYSQL 协议 (4) 在MYSQL里面再连接MYSQL

原创 大大刺猬 2023-04-23
210

写在前面

运维的时候有时候需要连接多个mysql, 一般是选用多个窗口来做, 当然也有图形化的客户端软件.

本文使用一个简单的方法: 在mysql里面连接Mysql. 听起来是不是有点怪

原理

  1. 伪造一个server

  2. 客户端连接到这个server上, 然后转发客户端的流量到真实的server

  3. 当接收到特殊语句 比如: /ddcw_switch/select root:123456@192.168.101.19:3306;时, 就连接到指定的新的server, 然后返回OK给客户端.

  4. 客户端执行的新的查询就会被 中间件 发往新的server

测试

修改参数, 并启动脚本

基本上都是根据之前的脚本修修改改…

指定监听的端口, 和真实是mysql服务器(默认连接的服务), 不需要账号密码, 因为是客户端提供, 这里只负责转发
image.png
image.png

客户端连接使用

–skip-ssl 还没实现ssl

-c 传递注释到server

mysql -h192.168.101.21 -P3306 -p123456 --skip-ssl -c

image.png
发现数据确实是新服务器的了, 说明这个功能是正常的.

总结

  1. 发现能解析mysql连接协议之后, 就能做很多事情了, 比如上次的读写分离, 这次的mysql里面连接mysql, 还可以做流量镜像, 审计等

  2. 我是专门使用的一个线程去处理client发来的数据, 再来个线程去处理发给mysql的数据的. 通信使用的是Queue

  3. 运维的时候可能有用吧, 毕竟在一个窗口就能连接多个数据库.

待改进: 可以查询多个数据库的结果汇总在一起, 运维就更方便了(就像分布式数据库那样)

附源码

testpymysql.py见上一章. 需要修改下client_flag 加个CLIENT_DEPRECATE_EOF, 因为客户端是使用的CLIENT_DEPRECATE_EOF, 我只是懒得去判断了.

mysql_switch.py如下

import struct from threading import Thread from multiprocessing import Process, Queue import socket import time import testpymysql import signal import os,sys def btoint(bdata,t='little'): return int.from_bytes(bdata,t) def read_pack(rf): pack_header = rf.read(4) if len(pack_header) < 4: print(pack_header,' bye!') exit(2) btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header) pack_size = btrl + (btrh << 16) bdata = rf.read(pack_size) if bdata.find(b'/*ddcw_switch*/') != -1: return pack_header+bdata,True else: return pack_header+bdata,False class mrw(object): def __init__(self): self.host = '0.0.0.0' self.port = 3306 self.w = ('192.168.101.21',3308,) #专门发信息给客户端... def sendtoclient(self,client_sock,q): while True: bdata = q.get() print('send: ',btoint(bdata[3:4]),bdata) client_sock.sendall(bdata) print('send:fin',btoint(bdata[3:4]),bdata) def handler_server(self,rf,q): def sc(*args,**kwargs): print("i'm died....") sys.exit(0) #signal.signal(signal.SIGTERM, sc) while True: bdata,status = read_pack(rf) #print('send: ', btoint(bdata[3:4]), bdata) q.put(bdata) def handler(self,conn,addr): #连接SERVER sock = socket.create_connection((self.w[0], self.w[1])) sock.settimeout(None) srf = sock.makefile('rb') crf = conn.makefile('rb') queue = Queue() t1 = Process(target=self.sendtoclient,args=(conn,queue)) t1.start() t2 = Process(target=self.handler_server,args=(srf,queue,)) t2.start() #pid = t2.pid while True: bdata,status = read_pack(crf) print('recived:', btoint(bdata[3:4]), bdata) if status: data = bdata[6:].decode().replace('"','').replace("'","").split(';')[0].split() dl = len(data) data = data[dl-1] user_password = data.split('@')[0] host_port = data.split('@')[1] nm = testpymysql.mysql() nm.host = host_port.split(':')[0] nm.port = host_port.split(':')[1] nm.user = user_password.split(':')[0] nm.password = user_password.split(':')[1] nm.connect() queue.put(b'\x01\x00\x00\x01\x01') queue.put(b'\x1c\x00\x00\x02\x03def\x00\x00\x00\x06status\x00\x0c!\x00\x15\x00\x00\x00\xfd\x01\x00\x1f\x00\x00') queue.put(b'\x08\x00\x00\x03\x07Success') queue.put(b'\x07\x00\x00\x04\xfe\x00\x00\x02\x00\x00\x00') sock = nm.sock t2.terminate() t2 = Thread(target=self.handler_server,args=(nm.rf,queue)) t2.start() else: sock.sendall(bdata) print('send to server') def init(self): socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_server.bind((self.host, self.port)) socket_server.listen(151) self.socket_server = socket_server accept_client_thread = Thread(target=self.accept_client,) accept_client_thread.start() accept_client_thread.join() def accept_client(self,): while True: conn, addr = self.socket_server.accept() thread = Process(target=self.handler,args=(conn,addr),) thread.start() aa = mrw() aa.init()
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论