实际上每次ClientPrepareStatement.addBatch都只是放到了AbstractQuery.List<Object>batchedArgs里,没有传到服务端
调用executeBatch方法后,会调用executeBatchSerially,这里有一个不同于以前的认为,mysql批处理一次只会传递一个参数,但只会编译一次sql语句,也就是:
insert语句在客户端只会编译一次,但会发送10000次,只有每次从mysql服务端传递了结果后,才能发送下一条.这就是预编译.并不是说mysql服务端提前编译语句,客户端一次性传递所有参数.不是这样的.
所以批处理优化的核心是,
1.减少了服务端sql编译的时间,因为客户端提前编译好了sql,就不需要服务端在编译了, 比如30万条sql,让服务端编译的话是非常耗费时间的.
2.提交事务时redolog刷盘的时间我们在使用时还必须把autoCommit设置为false,这两个是提高批处理性能的核心参数.
3.binlog同步给从库的时间,将同步方式设置为不需要同步,这样还能减少binlog的复制时间
所以并没有什么黑魔法,我之前以为批处理是mysql通过tcp一次性接受所有的参数然后执行.
以及批量执行时,必须将autoCommit=false,最后统一刷新redolog和binlog,但这样Binlog也会特别大.
最后会调用NativeProtocol.sendQueryPacket()方法向mysql传递查询查询包,底层在调用PacketSender发送tcp数据
protected long[] executeBatchSerially(int batchTimeout) throws SQLException {
...... 关键在于这里,可以看到批处理执行sql实际上是个for循环,一次拿出AbstractQuery.List<Object>batchedArgs里的一个参数,执行拿到结果及后才会执行下一条sql语句
for (batchCommandIndex = 0; batchCommandIndex < nbrCommands; batchCommandIndex++) {
((PreparedQuery<?>) this.query).setBatchCommandIndex(batchCommandIndex);
Object arg = this.query.getBatchedArgs().get(batchCommandIndex);//拿到单条sql语句的参数
.....
updateCounts[batchCommandIndex] = executeUpdateInternal(queryBindings, true);//执行sql语句,拿到结果集.批处理一般都是DML的insert,update,delete语句,因此返回结果集一般是受影响行数
// limit one generated key per OnDuplicateKey statement
getBatchedGeneratedKeys(containsOnDuplicateKeyUpdateInSQL() ? 1 : 0);
......
return (updateCounts != null) ? updateCounts : new long[0];
}
}
protected <M extends Message> ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, M sendPacket, boolean createStreamingResultSet,
boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
try {
JdbcConnection locallyScopedConnection = this.connection;
((PreparedQuery<?>) this.query).getQueryBindings()
.setNumberOfExecutions(((PreparedQuery<?>) this.query).getQueryBindings().getNumberOfExecutions() + 1);
ResultSetInternalMethods rs;
CancelQueryTask timeoutTask = null;
try {
timeoutTask = startQueryTimer(this, getTimeoutInMillis());
if (!isBatch) {
statementBegins();
}
//rs就是每次sql更新的结果集,这表明,必须等一条sql处理完,才能执行批处理的下一条sql语句.
rs = ((NativeSession) locallyScopedConnection.getSession()).execSQL(this, null, maxRowsToRetrieve, (NativePacketPayload) sendPacket,
createStreamingResultSet, getResultSetFactory(), metadata, isBatch);
if (timeoutTask != null) {
stopQueryTimer(timeoutTask, true, true);
timeoutTask = null;
}
} finally {
if (!isBatch) {
this.query.getStatementExecuting().set(false);
}
stopQueryTimer(timeoutTask, false, false);
}
return rs;
} catch (NullPointerException npe) {
checkClosed(); // we can't synchronize ourselves against async connection-close due to deadlock issues, so this is the next best thing for
// this particular corner case.
throw npe;
}
}
}最后修改时间:2025-10-03 20:40:51
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




