暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MySQL数据传输的注意事项

johopig 2021-12-27
1134


  • 背景

  • net_buffer_length

  • max_allowed_packet

  • 作用

  • 相关源码

  • 注意事项

  • 总结


背景

业务方迁移时,总的数据量大小只有300多M,但是迁移时MySQL server提示packet超过了max_allowed_packet
,而在测试迁移时10G的压测数据反而没有发生任何问题?

该问题最终的解决方案很简单,调整合适的max_allowed_packet
的大小、新建连接进行操作,

但我们可以借此深入了解一波MySQL Packet相关知识,比如MySQL是在哪里对包大小做了限制等等,顺便摸一下鱼~

net_buffer_length

版本mysqldumpmysql clientmysql server
5.7default:1MB
max:16MB
default:16KBdefault:16KB
max:1MB
8.0default:1MBdefault:16KBdefault:16KB

max_allowed_packet

max allowed packet设置MySQL Server和Client之间任何单个消息的大小的上限,包括副本(主从或者MGR)

版本mysqldumpmysql clientmysql server
5.5default:24MBdefault:16MBdefault:1MB
5.7default:24MBdefault:16MBdefault:4MB
8.0default:24MBdefault:16MBdefault:64MB

这里需要提到的一个MySQL的API函数:mysql_stmt_send_long_data()

该函数的作用是(使用预处理功能时)允许将参数数据分块多次的发送给服务器,例如,当blob
或者text
的大小超过max_allowed_packet
的大小。前提是该列类型必须是TEXT
BLOB
数据类型

作用

上述两个参数的作用为:每个包发送到网络或者从网络读包都会先把数据包保存在net->buff里,待到net->buff满了或者一次命令结束才会通过socket发出给对端。net->buff有个初始大小(net->max_packet),会随读取数据的增多而扩展直到max_allowed_packet
的大小

相关源码

MySQL Packet结构

MySQL 的客户端和服务端交互是以数据包(Packet) 为单位进行的, 每个包的大小长度有限制, 最长为 2^24−1 个字节(即 16MB), 若包长度过大, 则客户端需要自行将包分片, 使得每段的长度在 MySQL 包的最大长度之下, MySQL Packet 由 Header
Body
组成,。

Header
包含两个字段: 包长度(payload_length
)、序列号(sequence_id
), Body
则是包的主体部分, 它的长度由 Header
中的 payload_length
字段指示。

其中payload_length
占3字节,sequence_id
占1字节。

所以,MySQL一个Pakcet的最大长度为 16M + 4 字节,大于 max_packet_size(2^24-1字节)的数据都会被拆包发送

接收方

my_net_read()

大包处理,最后都会循环调用net_read_packet()直到满足退出条件

ulong my_net_read(NET *net) {
  size_t len;
  /* turn off non blocking operations */
  if (!vio_is_blocking(net->vio)) vio_set_blocking_flag(net->vio, true);

  // 是否启用压缩
  if (net->compress)
    net_read_compressed_packet(net, len);
  else
    net_read_uncompressed_packet(net, len);

  return static_cast<ulong>(len);
}

两者类似,我们看一下读取未压缩的packet过程

net_read_uncompressed_packet()

将一个数据包读入net->buff + net->where_b,如果是多包报文的第一个报文(由数据包的长度= 0xffffff表示[16M]),则读取并拼接所有子包。

static void net_read_uncompressed_packet(NET *net, size_t &len) {
  size_t complen;
  assert(!net->compress);
  // 第一次读取包,并返回包长
  len = net_read_packet(net, &complen);
  // 包长度为为0xffff时,循环读取后续包
  // 直到后续包长度不为0xffff
  if (len == MAX_PACKET_LENGTH) {
    /* First packet of a multi-packet.  Concatenate the packets */
    ulong save_pos = net->where_b;
    size_t total_length = 0;
    do {
      net->where_b += len;
      total_length += len;
      len = net_read_packet(net, &complen);
    } while (len == MAX_PACKET_LENGTH);
    if (len != packet_error) len += total_length;
    net->where_b = save_pos;
  }
  net->read_pos = net->buff + net->where_b;
  if (len != packet_error)
    net->read_pos[len] = 0/* Safeguard for mysql_use_result */
}

net_read_packet()

将packet数据读入buffer中, 并返回当前接收到的这个packet的长度

/*
   @return The length of the packet, or @c packet_error on error.
*/

static size_t net_read_packet(NET *net, size_t *complen) {
  size_t pkt_len, pkt_data_len;
  ...
  /* Retrieve packet length and number. */
  if (net_read_packet_header(net)) goto error;
  ...
  /* 读取packet的前3个字节,获取当前收到packet的payload_length */
  pkt_len = uint3korr(net->buff + net->where_b);
  ...
  /* 计算包括之前一共获取到的packet数据长度 */
  pkt_data_len = max(pkt_len, *complen) + net->where_b;
  ...
  /* 
    通过net_realloc()扩容net_buffer
    若总的packet长度超过max_packet(16M),但是并未超过
    max_allowed_packet时,正常执行扩容net_buffer并读入payload数据
  */

  if ((pkt_data_len >= net->max_packet) && net_realloc(net, pkt_data_len))
    goto error;

  /* Read the packet data (payload). */
  if (net_read_raw_loop(net, pkt_len)) goto error;
  ...
}

#define uint3korr(A)  (uint32_t) (((uint32_t) ((uint8_t) (A)[0])) +\
                                  (((uint32_t) ((uint8_t) (A)[1])) << 8) +\
                                  (((uint32_t) ((uint8_t) (A)[2])) << 16))


net_realloc()

net_buffer动态扩容,总buffer length不能超过 max_allowed_packet大小

bool net_realloc(NET *net, size_t length) {
  uchar *buff;
  size_t pkt_length;
  DBUG_TRACE;
  DBUG_PRINT("enter", ("length: %lu", (ulong)length));

  // 当总的packet长度已经超过max_allowed_packet大小
  // 记录错误,返回true
  // 可以在mysql server日志中看到ER_NET_PACKET_TOO_LARGE的错误
  if (length >= net->max_packet_size) {
    DBUG_PRINT("error",
               ("Packet too large. Max size: %lu", net->max_packet_size));
    /* Error, but no need to stop using the socket. */
    net->error = NET_ERROR_SOCKET_RECOVERABLE;
    net->last_errno = ER_NET_PACKET_TOO_LARGE;
#ifdef MYSQL_SERVER
    my_error(ER_NET_PACKET_TOO_LARGE, MYF(0));
#endif
    return true;
  }
  pkt_length = (length + IO_SIZE - 1) & ~(IO_SIZE - 1);
  ...
#ifdef MYSQL_SERVER
  net->buff = net->write_pos = buff;
#else
  size_t cur_pos_offset = NET_ASYNC_DATA(net)->cur_pos - net->buff;
  net->buff = net->write_pos = buff;
  NET_ASYNC_DATA(net)->cur_pos = net->buff + cur_pos_offset;
#endif
  net->buff_end = buff + (net->max_packet = (ulong)pkt_length);
  return false;
}

发送方

发送过程比较容易理解,直接用go中经常使用的Go-MySQL-Driver
包为例

// Write packet buffer 'data'
func (mc *mysqlConn) writePacket(data []byte) error {
 pktLen := len(data) - 4

    // 超过包的最大值(可以修改),不同版本默认值不同
 if pktLen > mc.maxAllowedPacket {
  return ErrPktTooLarge
 }

 // Perform a stale connection check. We only perform this check for
 // the first query on a connection that has been checked out of the
 // connection pool: a fresh connection from the pool is more likely
 // to be stale, and it has not performed any previous writes that
 // could cause data corruption, so it's safe to return ErrBadConn
 // if the check fails.
 if mc.reset {
  mc.reset = false
  conn := mc.netConn
  if mc.rawConn != nil {
   conn = mc.rawConn
  }
  var err error
  // If this connection has a ReadTimeout which we've been setting on
  // reads, reset it to its default value before we attempt a non-blocking
  // read, otherwise the scheduler will just time us out before we can read
  if mc.cfg.ReadTimeout != 0 {
   err = conn.SetReadDeadline(time.Time{}) // 设置超时
  }
  if err == nil {
   err = connCheck(conn) // 检查连接
  }
  if err != nil {
   errLog.Print("closing bad idle connection: ", err)
   mc.Close()
   return driver.ErrBadConn
  }
 }

 for {
  var size int
         // 大于 1<<24 - 1字节(16M,硬编码,不允许修改,mysql server也是如此),要进行拆包发送
  if pktLen >= maxPacketSize {
            // 接收方读到这个头就会知道还有'后续包'
   data[0] = 0xff
   data[1] = 0xff
   data[2] = 0xff
   size = maxPacketSize
  } else {
   data[0] = byte(pktLen)
   data[1] = byte(pktLen >> 8)
   data[2] = byte(pktLen >> 16)
   size = pktLen
  }
  data[3] = mc.sequence // 包的编号

  // Write packet
  if mc.writeTimeout > 0 { // 写超时
   if err := mc.netConn.SetWriteDeadline(time.Now().Add(mc.writeTimeout)); err != nil {
    return err
   }
  }

  n, err := mc.netConn.Write(data[:4+size]) // 发送数据
  if err == nil && n == 4+size {
   mc.sequence++
   if size != maxPacketSize {
    return nil
   }
   pktLen -= size
   data = data[size:]
   continue // 继续发送'剩余包的数据'
  }

  // Handle error
  if err == nil { // n != len(data)
   mc.cleanup()
   errLog.Print(ErrMalformPkt)
  } else {
   if cerr := mc.canceled.Value(); cerr != nil {
    return cerr
   }
   if n == 0 && pktLen == len(data)-4 {
    // only for the first loop iteration when nothing was written yet
    return errBadConnNoWrite
   }
   mc.cleanup()
   errLog.Print(err)
  }
  return ErrInvalidConn
 }
}

注意事项

  • mysqldump的语句合并问题

需要注意的是mysqldump导出时如果不指定--net-buffer-length,但指定了--opt或--extended-insert, -e(创建多行Insert语句),那么默认单条insert语句的大小是1MB

假设这里的单条insert语句未达到1MB,

insert into t1 values(1,2);

那么mysqldump会帮你把多条insert拼接起来直到大小达到1MB,如下所示

insert into t1 values(1,2),(3,4),(5,6),(7,8).....(n,n);

假如你本来单条insert就超过了1MB,那么mysqldump是不会再拼接的

  • max_allowed_packet变量设置的问题
  1. 该系统变量需要在新的MySQL连接中才生效
  2. 该参数需要在传输双端都进行调整,否则任何一端溢出都会导致数据传输失败
  • 要考虑到max_allowed_packet
    设置的太大而实例内存不足的情况

总结

上面的注意事项也就解释了为什么在业务方迁移时,总的数据量大小只有300多M,但是迁移时mysql server提示packet超过了max_allowed_packet
,而在测试迁移时10G的压测数据反而没有发生任何问题:


文章转载自johopig,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论