在上一篇中我们知道了几种常用的redis client,分别分析了lettuce原生的pipeline处理方式和在使用spring data redis包装后的lettuce处理pipeline时源码细节,并知道了后者直接使用时并不是真正的pipeline操作。那么如果我既想要使用spring-data-redis来操作lettuce的pipeline,又想要真正做到pipeline该怎么处理呢?本节我们就来聊一聊这个问题。与此同时,我们会来进一步地分析下redis connection和pool的内容。
前言
我们先来了解下在spring-data-redis中是如何包装lettuce的连接的,然后会根据这些信息得到上一篇文章中留下的那个问题的解。
连接处理
会先后对r连接池、redisTemplate模式下的连接和shareNativeConnection模式下的连接处理方式进行分析。
连接池
如果想了解连接池的内容,就需要了解下LettuceConnectionFactory。我们来看一下它的属性:
private final LettuceClientConfiguration clientConfiguration;private @Nullable AbstractRedisClient client;private @Nullable LettuceConnectionProvider connectionProvider;private @Nullable LettuceConnectionProvider reactiveConnectionProvider;private boolean validateConnection = false;private boolean shareNativeConnection = true;private @Nullable SharedConnection<byte[]> connection;private @Nullable SharedConnection<ByteBuffer> reactiveConnection;private @Nullable LettucePool pool;/** Synchronization monitor for the shared Connection */private final Object connectionMonitor = new Object();private boolean convertPipelineAndTxResults = true;private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);private @Nullable RedisConfiguration configuration;private @Nullable ClusterCommandExecutor clusterCommandExecutor;
主要属性:
•LettuceClientConfiguration:client的配置,基于commons pool的连接池目前也是基于它;•AbstractRedisClient client:内部维持的redis client对象;•LettuceConnectionProvider connectionProvider: 连接提供者,连接池就是由它来提供•LettuceConnectionProvider reactiveConnectionProvider: reactive模式下的连接提供者•validateConnection:是否校验连接•shareNativeConnection:是否共享本地连接•SharedConnection<byte[]> connection:用于共享的连接,如果shareNativeConnection为false则此处为null•SharedConnection
主要方法:
1. 连接池
/*** @param pool* @deprecated since 2.0, use pooling via {@link LettucePoolingClientConfiguration}.*/@Deprecatedpublic LettuceConnectionFactory(LettucePool pool) {this(new MutableLettuceClientConfiguration());this.pool = pool;}
旧的连接工厂,目前不再使用了,目前使用的是根据LettucePoolingClientConfiguration的配置初始化的连接池。
LettuceConnectionFactory的创建部分见org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration#redisConnectionFactory方法:
@Bean@ConditionalOnMissingBean(RedisConnectionFactory.class)public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources)throws UnknownHostException {LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources,this.properties.getLettuce().getPool());return createLettuceConnectionFactory(clientConfig);}
通过clientResources和配置信息来初始化LettuceClientConfiguration。
2. LettuceConnectionConfiguration#getLettuceClientConfiguration方法:
private LettuceClientConfiguration getLettuceClientConfiguration(ClientResources clientResources, Pool pool) {LettuceClientConfigurationBuilder builder = createBuilder(pool);applyProperties(builder);if (StringUtils.hasText(this.properties.getUrl())) {customizeConfigurationFromUrl(builder);}builder.clientResources(clientResources);customize(builder);return builder.build();}
这里通过LettuceClientConfigurationBuilder来构建LettuceClientConfiguration对象的,来看下它的实现:

那么这里会创建LettuceClientConfigurationBuilder对象还是LettucePoolingClientConfigurationBuilder对象呢?需要来看下createBuilder方法:
private LettuceClientConfigurationBuilder createBuilder(Pool pool) {if (pool == null) {return LettuceClientConfiguration.builder();}return new PoolBuilderFactory().createBuilder(pool);}
可以看出如果配置了spring.redis.lettuce.pool的相关信息,这里就会生成LettucePoolingClientConfigurationBuilder对象。我们来看下LettucePoolingClientConfigurationBuilder的build方法:
@Overridepublic LettucePoolingClientConfiguration build() {return new DefaultLettucePoolingClientConfiguration(super.build(), poolConfig);}GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig()
这里最后生成的是DefaultLettucePoolingClientConfiguration对象,内部使用的连接池配置为commons-pool提供的GenericObjectPoolConfig。
3. 我们来看下LettuceConnectionFactory的初始化部分,.LettuceConnectionFactory#afterPropertiesSet:
public void afterPropertiesSet() {this.client = createClient();this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);--------省略集群模式下的---------}}
这里省略掉集群模式下的初始化方式的分析,因为单机和集群版的流程大致相同,只是一些配置不太一样。
4. createConnectionProvider方法:
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {return new LettucePoolingConnectionProvider(connectionProvider,(LettucePoolingClientConfiguration) this.clientConfiguration);}return connectionProvider;}
由于上面生成的是DefaultLettucePoolingClientConfiguration,这里最后生成的就是LettucePoolingConnectionProvider对象。也就是一个池化的对象。
接下来我们来看一看从它里面获取连接的方法org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider#getConnection:
private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap<>(32);@Overridepublic <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),poolConfig, false);});try {StatefulConnection<?, ?> connection = pool.borrowObject();poolRef.put(connection, pool);return connectionType.cast(connection);} catch (Exception e) {throw new PoolException("Could not get a resource from the pool", e);}}
pools是用来维护Connection类型与GenericObjectPool连接池之间关系的一个map,每次获取连接时会根据连接类型获取到对应的连接池,然后从连接池中获取连接。在LettucePoolingConnectionProvider内部包装着一个StandaloneConnectionProvider类型的provider,它才是最终提供connection对象的provider,代码部分为:
ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),poolConfig, false)
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getConnection
该方法的代码如下:
private @Nullable LettucePool pool;public RedisConnection getConnection() {if (isClusterAware()) {return getClusterConnection();}LettuceConnection connection;if (pool != null) {connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase());} else {connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());}connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);return connection;}
这里需要注意一点,这个pool是LettucePool对象,在上文中已经分析过,它是比较老的api中的使用的pool,在新的中使用commons-pool代替了。所以这里在新的配置环境中会走pool==null这个分支。LettuceConnection的第一个入参是StatefulConnection<byte[], byte[]> asyncSharedConn对象,也就是说getSharedConnection方法返回的结果是asyncSharedConn。这个会对LettuceConnection的getAsyncConnection方法产生影响,该方法代码如下:
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {if (isQueueing()) {return getAsyncDedicatedConnection();}if (asyncSharedConn != null) {if (asyncSharedConn instanceof StatefulRedisConnection) {// 如果asyncSharedConn不为空,则会通过它的async方法来创建RedisClusterAsyncCommands对象return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).async();}}// 如果asyncSharedConn为空,则走getAsyncDedicatedConnection方法来创建RedisClusterAsyncCommandsreturn getAsyncDedicatedConnection();}
如果asyncSharedConn不为空,这里会通过asyncSharedConn的async()方法来生成RedisClusterAsyncCommands对象。
如果asyncSharedConn为空,则会调用getAsyncDedicatedConnection方法来生成RedisClusterAsyncCommands对象:
protected RedisClusterAsyncCommands<byte[], byte[]> getAsyncDedicatedConnection() {if (asyncDedicatedConn == null) {asyncDedicatedConn = doGetAsyncDedicatedConnection();if (asyncDedicatedConn instanceof StatefulRedisConnection) {((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex);}}if (asyncDedicatedConn instanceof StatefulRedisConnection) {return ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).async();}if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncDedicatedConn).async();}------------省略部分代码----------------}org.springframework.data.redis.connection.lettuce.LettuceConnection#doGetAsyncDedicatedConnection:protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {return connectionProvider.getConnection(StatefulConnection.class);}
可以看出最终doGetAsyncDedicatedConnection方法也是通过connectionProvider对象来获取StatefulConnection类型的连接对象的。
那么,调用哪个方法会返回RedisClusterAsyncCommands对象呢?
在LettuceConnection中只有一个public的方法返回RedisClusterAsyncCommands对象的:
@Overridepublic RedisClusterAsyncCommands<byte[], byte[]> getNativeConnection() {LettuceSubscription subscription = this.subscription;return (subscription != null ? subscription.getNativeConnection().async() : getAsyncConnection());}
可以看出,它调用的实际上也是getAsyncConnection()方法。
shareNativeConnection 参数
来看一下org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getSharedConnection方法:
@Nullableprotected StatefulRedisConnection<byte[], byte[]> getSharedConnection() {return shareNativeConnection ? (StatefulRedisConnection) getOrCreateSharedConnection().getConnection() : null;}
这里有一个很重要的参数——shareNativeConnection,如果shareNativeConnection为true,会使用getOrCreateSharedConnection().getConnection()来操作,它的第一步返回的是SharedConnection对象,然后通过getConnection()来获取native连接。我们来看下它们的方法:
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getOrCreateSharedConnection:private SharedConnection<byte[]> getOrCreateSharedConnection() {synchronized (this.connectionMonitor) {if (this.connection == null) {// 如果为空,则创建 SharedConnectionthis.connection = new SharedConnection<>(connectionProvider);}// 如果不为空,则使用相同连接return this.connection;}}org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.SharedConnection#getConnection:@NullableStatefulConnection<E, E> getConnection() {synchronized (this.connectionMonitor) {if (this.connection == null) {// 如果connectin为空则调用getNativeConnection方法获取连接this.connection = getNativeConnection();}if (getValidateConnection()) {// 校验连接validateConnection();}// 如果内部连接已经存在,则返回相同的连接return this.connection;}}org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.SharedConnection#getNativeConnection:/*** Obtain a connection from the associated {@link LettuceConnectionProvider}.** @return the connection.*/private StatefulConnection<E, E> getNativeConnection() {try {// 从provider中获取连接,这里也是从连接池中去获取连接的,返回的也是StatefulConnection类型的连接对象return connectionProvider.getConnection(StatefulConnection.class);} catch (RedisException e) {throw new RedisConnectionFailureException("Unable to connect to Redis", e);}}
这里主要是获取SharedConnection的步骤,最终会从connectionProvider中获取shared连接,连接为StatefulConnection类型。可以看出通过LettuceConnectionFactory#getSharedConnection方法最终获取到的连接为StatefulRedisConnection对象。
shareNativeConnection为true和false的区别
上面我们知道,当shareNativeConnection为true时会通过getOrCreateSharedConnection().getConnection()来初始化LettuceConnection的asyncSharedConn属性。它生成的是SharedConnection对象,然后通过它的getConnection方法获取具体连接的。在它们里面都有一个共同点,会先判断当连接对象为空时会创建新的连接,如果已经初始化过了,则使用已有的连接,即共享连接。
redisTemplate模式下的连接
咱们以this.redisTemplate.opsForValue().get(cacheKey)方法为切入点往下来看。
首先是org.springframework.data.redis.core.DefaultValueOperations#get(java.lang.Object)方法:
@Overridepublic V get(Object key) {return execute(new ValueDeserializingRedisCallback(key) {@Overrideprotected byte[] inRedis(byte[] rawKey, RedisConnection connection) {return connection.get(rawKey);}}, true);}
接着往下走,继续往下看org.springframework.data.redis.core.AbstractOperations#execute方法:
@Nullable<T> T execute(RedisCallback<T> callback, boolean b) {return template.execute(callback, b);}org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback<T>, boolean):@Nullablepublic <T> T execute(RedisCallback<T> action, boolean exposeConnection) {return execute(action, exposeConnection, false);}org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback<T>, boolean, boolean):@Nullablepublic <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");Assert.notNull(action, "Callback object must not be null");RedisConnectionFactory factory = getRequiredConnectionFactory();RedisConnection conn = null;try {if (enableTransactionSupport) {// 开启事务时获取连接的方法// only bind resources in case of potential transaction synchronizationconn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);} else {// 获取连接的方法conn = RedisConnectionUtils.getConnection(factory);}boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);RedisConnection connToUse = preProcessConnection(conn, existingConnection);boolean pipelineStatus = connToUse.isPipelined();if (pipeline && !pipelineStatus) {connToUse.openPipeline();}RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));// 执行的部分T result = action.doInRedis(connToExpose);// close pipelineif (pipeline && !pipelineStatus) {connToUse.closePipeline();}// TODO: any other connection processing?return postProcessResult(result, connToUse, existingConnection);} finally {// 释放连接RedisConnectionUtils.releaseConnection(conn, factory);}}
主要有三步操作:
1.通过RedisConnectionUtils.getConnection(factory)来获取连接,底层是通过connectionFactory.getConnection()来获取连接的;2.action.doInRedis:执行操作;3.使用RedisConnectionUtils.releaseConnection方法释放连接
spring-data-redis使用lettuce中假的pipeline的方法
看完了上面的内容,就能知道其实解决办法很简单:获取原生的lettuce连接、获取RedisClusterAsyncCommands对象,然后用原生的操作pipeline的方法来处理,继而释放连接即可。
RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();LettuceConnection connection = null;try {connection = (LettuceConnection) RedisConnectionUtils.getConnection(connectionFactory);//LettuceConnection connection = (LettuceConnection)redisTemplate.getConnectionFactory().getConnection();RedisClusterAsyncCommands<byte[], byte[]> commands = connection.getNativeConnection();commands.setAutoFlushCommands(false);List<RedisFuture<?>> futures = Lists.newArrayList();for (int i = 0; i < 50; i++) {futures.add(commands.set(("aaa-" + i).getBytes(), ("value-" + i).getBytes()));futures.add(commands.expire(("key-" + i).getBytes(), 3600));}// write all commands to the transport layercommands.flushCommands();// synchronization example: Wait until all futures completeboolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,futures.toArray(new RedisFuture[futures.size()]));}finally {if (connection != null){RedisConnectionUtils.releaseConnection(connection,connectionFactory);}}
上面的代码中主要包括四步:
•获取LettuceConnection对象,不管是否共享连接,底层实际上也都是从连接池中获取连接的,只是连接之间是否可以共享而已•获取RedisClusterAsyncCommands,通过connection.getNativeConnection()方法获取•操作部分:先关掉autoflush,然后将所有的操作加到commands列表中,最后直接flush出去•释放连接
后记
这里只是略显仓促地对上一篇的文章进行一个补充,如果能给你带来一些帮助,不甚荣幸!




