暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MYSQL 协议 (6) caching_sha2_password

本文主要讲caching_sha2_password的加密原理和连接过程(比mysql_native_password复杂一些)

mysql_native_password之前就已经讲过了.

本文不包含空密码场景

caching_sha2_password 加密原理

就是对密码做hash, 返回最后一次和第一次的异或, (最后一次加salt, 防止重返)

和mysql_native_password差不多, 只不过改用了sha2算法.

代码如下

def sha2_password(password,salt):
        stage1 = hashlib.sha256(password).digest()
        stage2 = hashlib.sha256(stage1).digest()
        stage3 = hashlib.sha256(stage2+salt).digest()

        result = bytearray(stage3)
        for x in range(len(result)):
                result[x] ^= stage1[x]
        return result

caching_sha2_password 源代码在sql/auth/sha2_password_common.cc

  /* SHA2(src) => digest_stage1 */
  if (m_digest_generator->update_digest(m_src.c_str(), m_src.length()) ||
      m_digest_generator->retrieve_digest(digest_stage1, m_digest_length)) {
    DBUG_PRINT("info", ("Failed to generate digest_stage1: SHA2(src)"));
    return true;
  }

  /* SHA2(digest_stage1) => digest_stage2 */
  m_digest_generator->scrub();
  if (m_digest_generator->update_digest(digest_stage1, m_digest_length) ||
      m_digest_generator->retrieve_digest(digest_stage2, m_digest_length)) {
    DBUG_PRINT("info",
               ("Failed to generate digest_stage2: SHA2(digest_stage1)"));
    return true;
  }

  /* SHA2(digest_stage2, m_rnd) => scramble_stage1 */
  m_digest_generator->scrub();
  if (m_digest_generator->update_digest(digest_stage2, m_digest_length) ||
      m_digest_generator->update_digest(m_rnd.c_str(), m_rnd.length()) ||
      m_digest_generator->retrieve_digest(scramble_stage1, m_digest_length)) {
    DBUG_PRINT("info", ("Failed to generate scrmable_stage1: "
                        "SHA2(digest_stage2, m_rnd)"));
    return true;
  }

  /* XOR(digest_stage1, scramble_stage1) => scramble */
  for (uint i = 0; i < m_digest_length; ++i)
    scramble[i] = (digest_stage1[i] ^ scramble_stage1[i]);

mysql_native_password源码在 sql/auth/password.cc

void scramble(char *to, const char *message, const char *password) {
  uint8 hash_stage1[SHA1_HASH_SIZE];
  uint8 hash_stage2[SHA1_HASH_SIZE];

  /* Two stage SHA1 hash of the password. */
  compute_two_stage_sha1_hash(password, strlen(password), hash_stage1,
                              hash_stage2);

  /* create crypt string as sha1(message, hash_stage2) */;
  compute_sha1_hash_multi((uint8 *)to, message, SCRAMBLE_LENGTH,
                          (const char *)hash_stage2, SHA1_HASH_SIZE);
  my_crypt(to, (const uchar *)to, hash_stage1, SCRAMBLE_LENGTH); /*while (s1 < s1_end) *to++ = *s1++ ^ *s2++;*/
}

连接过程

caching 的意思是缓存, 也就是server会把账号密码信息缓存在内存中, 方便下次快速连接.

所以caching_sha2_password 分为两种情况 快速认证 和 完整认证 (着重讲)
下图均未包含switch request

switch request其实就是把加密后的密码发给server即可

scrambled = sha2_password(self.password.encode(),auth_pack[auth_pack.find(b'\x00')+1:])
self.write_pack(scrambled)
auth_pack = self.read_pack()

快速认证

和mysql_native_password差不多.

注: 密码认证失败, 就走完整认证
image.png

完整认证

多了个使用公钥加密的过程(实际上还会请求公钥)
image.png

当客户端发送加密后的密码(第一个包)给server时, server返回值如下:

0xFE 表示交换认证(再把密码加密下发过去(只要密码了))
0x01 额外认证(插件)
0x00 OK

当收到0xFE或者0x01时(第二个字段如下值)

b'\x03' fast
b'\x04' full  如果是SSL/SOCK/shard_MEM 就不需要公钥加密了,明文就行

如果客户端需要公钥的话, 发个2给server即可 request_public_key = ‘\2’
image.png

测试

本次只演示完整认证.

直接执行如下代码即可, 我基本上都封装好了. 源码见文末

import mysql_password
aa = mysql_password.mysql()
aa.user = 'u1'
aa.password = '123456'
aa.connect()
#aa.pubk

image.png
我们去server看下

确实是没问题的, 我们再看下 Public key
image.png
image.png
和我们收到的public key 是一样的.

总结

  1. caching_sha2_password 有两种认证模式, 快速认证 和 完整认证

  2. 完整认证(full) 在未使用ssl/sock/共享内存的情况 会使用server的公钥对密码加密

  3. switch auth (0xFE) 就是只是把加密的密码发过去即可.

AuthSwitchRequest结果如下(回报没得结构,直接发加密后的password)
image.png

  1. 认证成功之后, server会缓存相关账号信息(官方说的, 实际上并没有找到, 下次使用gdb看看)

源码

源码有点长哈(基本上都是用之前的修修改改)…

rsa加密使用的pymysql的. 因为不属于本文的内容

import hashlib import struct import socket import os #来自pymysql from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding def btoint(bdata,t='little'): return int.from_bytes(bdata,t) #来自pymysql def _lenenc_int(i): if i < 0: raise ValueError("Encoding %d is less than 0 - no representation in LengthEncodedInteger" % i) elif i < 0xFB: return bytes([i]) elif i < (1 << 16): return b"\xfc" + struct.pack("<H", i) elif i < (1 << 24): return b"\xfd" + struct.pack("<I", i)[:3] elif i < (1 << 64): return b"\xfe" + struct.pack("<Q", i) else: raise ValueError("Encoding %x is larger than %x - no representation in LengthEncodedInteger"% (i, (1 << 64))) #就是做个异或 #来自Pymysql def _xor_password(password, salt): salt = bytearray(salt[:20]) password = bytearray(password) for i in range(len(password)): password[i] ^= salt[i%len(salt)] return bytes(password) #来自pymysql def sha2_rsa_encrypt(password, salt, public_key): message = _xor_password(password + b"\0", salt) rsa_key = serialization.load_pem_public_key(public_key, default_backend()) return rsa_key.encrypt( message, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None, ), ) def native_password(password,salt): stage1 = hashlib.sha1(password).digest() stage2 = hashlib.sha1(stage1).digest() rp = hashlib.sha1(salt) rp.update(stage2) result = bytearray(rp.digest()) for x in range(len(result)): result[x] ^= stage1[x] return result def sha2_password(password,salt): stage1 = hashlib.sha256(password).digest() stage2 = hashlib.sha256(stage1).digest() stage3 = hashlib.sha256(stage2+salt).digest() result = bytearray(stage3) for x in range(len(result)): result[x] ^= stage1[x] return result def parse_handshake(bdata): i = 0 protocol_version = bdata[:1] server_end = bdata.find(b"\0", i) i = server_end + 1 thread_id = btoint(bdata[i:i+4]) i += 4 salt = bdata[i:i+8] i += 9 server_capabilities = btoint(bdata[i:i+2]) i += 2 server_charset = btoint(bdata[i:i+1]) i += 1 server_status = btoint(bdata[i:i+2]) i += 2 server_capabilities |= btoint(bdata[i:i+2]) << 16 i += 2 salt_length = struct.unpack('<B',bdata[i:i+1])[0] salt_length = max(13,salt_length-8) i += 11 salt += bdata[i:i+salt_length] i += salt_length server_plugname = bdata[i:] return salt class mysql(object): def __init__(self): self.host = '192.168.101.21' self.port = 3314 self.user = 'u1' self.password = '123456' def read_pack(self,): pack_header = self.rf.read(4) btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header) pack_size = btrl + (btrh << 16) self._next_seq_id = (self._next_seq_id + 1) % 256 bdata = self.rf.read(pack_size) return bdata def write_pack(self,data): bdata = struct.pack("<I", len(data))[:3] + bytes([self._next_seq_id]) + data self.sock.sendall(bdata) self._next_seq_id = (self._next_seq_id + 1) % 256 def handshake(self,bdata): i = 0 #已经读取的字节数, 解析binlog的时候也是这么用的..... protocol_version = bdata[:1] #只解析10 server_end = bdata.find(b"\0", i) self.server_version = bdata[i:server_end] i = server_end + 1 self.thread_id = btoint(bdata[i:i+4]) i += 4 self.salt = bdata[i:i+8] i += 9 #还有1字节的filter, 没啥意义,就不保存了 self.server_capabilities = btoint(bdata[i:i+2]) i += 2 self.server_charset = btoint(bdata[i:i+1]) i += 1 self.server_status = btoint(bdata[i:i+2]) i += 2 self.server_capabilities |= btoint(bdata[i:i+2]) << 16 #往左移16位 为啥不把capability_flags_1和capability_flags_2和一起呢 i += 2 salt_length = struct.unpack('<B',bdata[i:i+1])[0] #懒得去判断capabilities & CLIENT_PLUGIN_AUTH了 salt_length = max(13,salt_length-8) #前面已经有8字节了 i += 1 i += 10 #reserved self.salt += bdata[i:i+salt_length] i += salt_length self.server_plugname = bdata[i:] def HandshakeResponse41(self,): #client_flag = 3842565 #不含DBname client_flag = 33531525#不含DBname #client_flag |= 1 << 3 charset_id = 45 #45:utf8mb4 33:utf8 #bdata = client_flag.to_bytes(4,'little') #其实应该最后在加, 毕竟还要判断很多参数, 可能还需要修改, 但是懒 bdata = struct.pack('<iIB23s',client_flag,2**24-1,charset_id,b'') bdata += self.user.encode() + b'\0' auth_password = native_password(self.password.encode(), self.salt[:20]) auth_response = _lenenc_int(len(auth_password)) + auth_password bdata += auth_response bdata += b"mysql_native_password" + b'\0' #本文有设置连接属性, 主要是为了方便观察 attr = {'_client_name':'ddcw_for_pymysql', '_pid':str(os.getpid()), "_client_version":'0.0.1',} #key长度+k+v长度+v connect_attrs = b"" for k, v in attr.items(): k = k.encode() connect_attrs += _lenenc_int(len(k)) + k v = v.encode() connect_attrs += _lenenc_int(len(v)) + v bdata += _lenenc_int(len(connect_attrs)) + connect_attrs self.write_pack(bdata) #0xFE 交换认证 https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_connection_phase_packets_protocol_old_auth_switch_request.html #0x01 额外认证 #0x00 OK #偷懒, 懒得去判断client_flag了 auth_pack = self.read_pack() if auth_pack[:1] == b'\0': print('OK',auth_pack) elif auth_pack[:1] == b'\xfe': #switch request print('hava switch request') if auth_pack.find(b'caching_sha2_password') < 0: print('仅测试caching_sha2_password, 但当前是:',auth_pack[1:auth_pack.find(b'\x00')]) return False scrambled = sha2_password(self.password.encode(),auth_pack[auth_pack.find(b'\x00')+1:]) #salt是剩下的部分 self.write_pack(scrambled) auth_pack = self.read_pack() print(auth_pack) self.caching_sha2_password_auth(auth_pack) elif auth_pack[:1] == b'\x01': self.caching_sha2_password_auth(auth_pack) else: print('FAILED',auth_pack) def caching_sha2_password_auth(self,auth_pack): if auth_pack[1:2] == b'\x03': #fast bdata = self.read_pack() #ok pack print('fast auth success.',bdata) elif auth_pack[1:2] == b'\x04': #full #如果是SSL/socket/shard_mem就直接发送密码(不需要加密了) TODO self.write_pack(b'\x02') #要公钥 bdata = self.read_pack() #server发来的公钥 pubk = bdata[1:] #第一字节是extra_auth 而且肯定是 0x01 #print('bdata',bdata) self.pubk = pubk password = sha2_rsa_encrypt(self.password.encode(), self.salt, pubk) self.write_pack(password) authpack = self.read_pack() #看看是否成功 print('full auth',authpack) else: print('???',auth_pack) def query(self,sql): """不考虑SQL超过16MB情况""" # payload_length:3 sequence_id:1 payload:N # payload: com_query(0x03):1 sql:n bdata = struct.pack('<IB',len(sql)+1,0x03) #I:每个com_query的seq_id都从0开始,第4字节固定为0, 所以直接用I, +1:com_query占用1字节, 0x03:com_query bdata += sql.encode() self.sock.sendall(bdata) self._next_seq_id = 1 #下一个包seq_id = 1 def connect(self): self._next_seq_id = 0 sock = socket.create_connection((self.host, self.port)) sock.settimeout(None) self.sock = sock self.rf = sock.makefile("rb") bdata = self.read_pack() self.handshake(bdata) self.HandshakeResponse41()
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论