DriverManager.connection核心类:
C
ConnectionUrl需要对 url进行校验, 比如 jdbc:mysql://192.168.101.65:3306/seata_demo
public static boolean isConnectionStringSupported(String connString) { if (connString == null) {
throw ExceptionFactory.createException(WrongArgumentException.class, Messages.getString("ConnectionString.0"));
}
Matcher matcher = SCHEME_PTRN.matcher(connString);
return matcher.matches() && Type.isSupported(decodeSkippingPlusSign(matcher.group("scheme"))); }//这是SCHEME_PATTERN的定义, 也就是说url需要满足这个正则表达式 private static final Pattern SCHEME_PTRN = Pattern.compile("(?<scheme>[\\w\\+:%]+).*");这里用Matcher类来执行真正的校验.其实是一个正则表达式的校验
校验通过后会生成一个ConnectionUrl实例,CnectionUrl内部有一个LRU链表,该链表底层用LinkedHashMap实现, 该数据结构既可以按照key查询值,又能维护一个双向链表,后插入的会排到链表的末尾
public static ConnectionUrl getConnectionUrlInstance(String connString, Properties info) {
if (connString == null) {
throw ExceptionFactory.createException(WrongArgumentException.class, Messages.getString("ConnectionString.0"));
} // 构建LRUCache的key
String connStringCacheKey = buildConnectionStringCacheKey(connString, info);
ConnectionUrl connectionUrl;
rwLock.readLock().lock();
connectionUrl = connectionUrlCache.get(connStringCacheKey); //双重校验,注意在使用悲观锁进行缓存更新,对象创建时必须用双重校验来实现
if (connectionUrl == null) {
rwLock.readLock().unlock();
rwLock.writeLock().lock();
try {
// Check again, in the meantime it could have been cached by another thread. 第二次校验
connectionUrl = connectionUrlCache.get(connStringCacheKey);
if (connectionUrl == null) {
ConnectionUrlParser connStrParser = ConnectionUrlParser.parseConnectionString(connString);
connectionUrl = Type.getConnectionUrlInstance(connStrParser, info);
connectionUrlCache.put(connStringCacheKey, connectionUrl);//更新缓存
}
rwLock.readLock().lock();
} finally {
rwLock.writeLock().unlock();
}
}
rwLock.readLock().unlock();
return connectionUrl;
}
ConnectionUrl内部会有多个host,后续建立连接时会依次用host连接,直到连接成功
ConnectionImpl的构造方法负责建立连接
public ConnectionImpl(HostInfo hostInfo) throws SQLException {
try {
// Stash away for later, used to clone this connection for Statement.cancel and Statement.setQueryTimeout().
this.origHostInfo = hostInfo;//com.mysql.cj.conf.HostInfo@1ed4004b :: {host: "192.168.101.65", port: 3306, hostProperties: {dbname=seata_demo}} 包含了连接的主要信息
this.origHostToConnectTo = hostInfo.getHost();//host
this.origPortToConnectTo = hostInfo.getPort();//port
this.database = hostInfo.getDatabase();//数据库
this.user = hostInfo.getUser();//用户名
this.password = hostInfo.getPassword();//密码
this.props = hostInfo.exposeAsProperties();//将hostInfo的属性全部转换成Property键值对格式
this.propertySet = new JdbcPropertySetImpl();
this.propertySet.initializeProperties(this.props);//
// We need Session ASAP to get access to central driver functionality
this.nullStatementResultSetFactory = new ResultSetFactory(this, null);
this.session = new NativeSession(hostInfo, this.propertySet);//
this.session.addListener(this); // listen for session status changes
//客户端参数设置
// we can't cache fixed values here because properties are still not initialized with user provided values
this.autoReconnectForPools = this.propertySet.getBooleanProperty(PropertyKey.autoReconnectForPools);
this.cachePrepStmts = this.propertySet.getBooleanProperty(PropertyKey.cachePrepStmts);
this.autoReconnect = this.propertySet.getBooleanProperty(PropertyKey.autoReconnect);
this.useUsageAdvisor = this.propertySet.getBooleanProperty(PropertyKey.useUsageAdvisor);
this.reconnectAtTxEnd = this.propertySet.getBooleanProperty(PropertyKey.reconnectAtTxEnd);
this.emulateUnsupportedPstmts = this.propertySet.getBooleanProperty(PropertyKey.emulateUnsupportedPstmts);
this.ignoreNonTxTables = this.propertySet.getBooleanProperty(PropertyKey.ignoreNonTxTables);
this.pedantic = this.propertySet.getBooleanProperty(PropertyKey.pedantic);
this.prepStmtCacheSqlLimit = this.propertySet.getIntegerProperty(PropertyKey.prepStmtCacheSqlLimit);
this.useLocalSessionState = this.propertySet.getBooleanProperty(PropertyKey.useLocalSessionState);
this.useServerPrepStmts = this.propertySet.getBooleanProperty(PropertyKey.useServerPrepStmts);
this.processEscapeCodesForPrepStmts = this.propertySet.getBooleanProperty(PropertyKey.processEscapeCodesForPrepStmts);
this.useLocalTransactionState = this.propertySet.getBooleanProperty(PropertyKey.useLocalTransactionState);
this.disconnectOnExpiredPasswords = this.propertySet.getBooleanProperty(PropertyKey.disconnectOnExpiredPasswords);
this.readOnlyPropagatesToServer = this.propertySet.getBooleanProperty(PropertyKey.readOnlyPropagatesToServer);
//配置异常拦截器
String exceptionInterceptorClasses = this.propertySet.getStringProperty(PropertyKey.exceptionInterceptors).getStringValue();
if (exceptionInterceptorClasses != null && !"".equals(exceptionInterceptorClasses)) {
this.exceptionInterceptor = new ExceptionInterceptorChain(exceptionInterceptorClasses, this.props, this.session.getLog());
}
if (this.cachePrepStmts.getValue()) {
createPreparedStatementCaches();
}
if (this.propertySet.getBooleanProperty(PropertyKey.cacheCallableStmts).getValue()) {
this.parsedCallableStatementCache = new LRUCache<>(this.propertySet.getIntegerProperty(PropertyKey.callableStmtCacheSize).getValue());
}
if (this.propertySet.getBooleanProperty(PropertyKey.allowMultiQueries).getValue()) {
this.propertySet.getProperty(PropertyKey.cacheResultSetMetadata).setValue(false); // we don't handle this yet
}
if (this.propertySet.getBooleanProperty(PropertyKey.cacheResultSetMetadata).getValue()) {
this.resultSetMetadataCache = new LRUCache<>(this.propertySet.getIntegerProperty(PropertyKey.metadataCacheSize).getValue());
}
if (this.propertySet.getStringProperty(PropertyKey.socksProxyHost).getStringValue() != null) {
this.propertySet.getProperty(PropertyKey.socketFactory).setValue(SocksProxySocketFactory.class.getName());
}
this.dbmd = getMetaData(false, false);
initializeSafeQueryInterceptors();//配置拦截器
} catch (CJException e1) {
throw SQLExceptionsMapping.translateException(e1, getExceptionInterceptor());
}
try {
createNewIO(false);//调用connectOneTryOnly()建立mysql连接,并认证
unSafeQueryInterceptors();
AbandonedConnectionCleanupThread.trackConnection(this, this.getSession().getNetworkResources());
} catch (SQLException ex) {
cleanup(ex);
// don't clobber SQL exceptions
throw ex;
} catch (Exception ex) {
cleanup(ex);
throw SQLError
.createSQLException(
this.propertySet.getBooleanProperty(PropertyKey.paranoid).getValue() ? Messages.getString("Connection.0")
: Messages.getString("Connection.1",
new Object[] { this.session.getHostInfo().getHost(), this.session.getHostInfo().getPort() }),
MysqlErrorNumbers.SQL_STATE_COMMUNICATION_LINK_FAILURE, ex, getExceptionInterceptor());
}
}
createNewIO ->connectOneTryOnly ->session.connect ->sockeConnection.connect ->socketFsctory.connect 1.createSocket()创建Socket 2.configureSocket()设置tcp参数,比如是否允许自动探活,是否延迟发送,设置接收缓冲区大小,发送缓冲区大小, 以及拥塞控制策略,默认都是0或者null 2.rawSocket.connect()建立tcp连接 设置发送缓冲区和接收缓冲区的大小,所以虽然tcp内核没有设置缓冲区大小,但是再Jvm堆里分配了各16M的缓冲区 this.mysqlInput = new FullReadInputStream(rawInputStream);//预读Stream,每次read都会将缓冲区尽可能全部读满,以减少系统调用.具体体现在read方法,read方法会调用fill方法,该方法会确定每次要读多少数据
this.mysqlOutput = new BufferedOutputStream(this.mysqlSocket.getOutputStream(), 16384); 实例化protocol为NativeProtocol,该类负责实现Mysql通信协议,将上层的sql语句,参数,转换为Mysql应用层协议,交给下层来处理
private void connectOneTryOnly(boolean isForReconnect) throws SQLException {
Exception connectionNotEstablishedBecause = null;
try {
JdbcConnection c = getProxy();
this.session.connect(this.origHostInfo, this.user, this.password, this.database, getLoginTimeout(), c);
// save state from old connection
boolean oldAutoCommit = getAutoCommit();
int oldIsolationLevel = this.isolationLevel;
boolean oldReadOnly = isReadOnly(false);
String oldDb = getDatabase();
this.session.setQueryInterceptors(this.queryInterceptors);
// Server properties might be different from previous connection, so initialize again...
initializePropsFromServer();
if (isForReconnect) {
// Restore state from old connection
setAutoCommit(oldAutoCommit);
setTransactionIsolation(oldIsolationLevel);
setDatabase(oldDb);
setReadOnly(oldReadOnly);
}
return;
} catch (UnableToConnectException rejEx) {
close();
this.session.getProtocol().getSocketConnection().forceClose();
throw rejEx;
} catch (Exception EEE) {
if ((EEE instanceof PasswordExpiredException
|| EEE instanceof SQLException && ((SQLException) EEE).getErrorCode() == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD)
&& !this.disconnectOnExpiredPasswords.getValue()) {
return;
}
if (this.session != null) {
this.session.forceClose();
}
connectionNotEstablishedBecause = EEE;
if (EEE instanceof SQLException) {
throw (SQLException) EEE;
}
if (EEE.getCause() != null && EEE.getCause() instanceof SQLException) {
throw (SQLException) EEE.getCause();
}
if (EEE instanceof CJException) {
throw (CJException) EEE;
}
SQLException chainedEx = SQLError.createSQLException(Messages.getString("Connection.UnableToConnect"),
MysqlErrorNumbers.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE, getExceptionInterceptor());
chainedEx.initCause(connectionNotEstablishedBecause);
throw chainedEx;
}
}//设置连接的tcp参数
private void configureSocket(Socket sock, PropertySet pset) throws SocketException, IOException {
sock.setTcpNoDelay(pset.getBooleanProperty(PropertyKey.tcpNoDelay).getValue());
sock.setKeepAlive(pset.getBooleanProperty(PropertyKey.tcpKeepAlive).getValue());
int receiveBufferSize = pset.getIntegerProperty(PropertyKey.tcpRcvBuf).getValue();
if (receiveBufferSize > 0) {
sock.setReceiveBufferSize(receiveBufferSize);
}
int sendBufferSize = pset.getIntegerProperty(PropertyKey.tcpSndBuf).getValue();
if (sendBufferSize > 0) {
sock.setSendBufferSize(sendBufferSize);
}
int trafficClass = pset.getIntegerProperty(PropertyKey.tcpTrafficClass).getValue();
if (trafficClass > 0) {
sock.setTrafficClass(trafficClass);
}
}
最后修改时间:2025-10-03 19:49:18
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




