暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis入门看这一篇就够了!(两万字总结)

coderchen 2021-06-18
927

Redis入门篇

本文大纲:

1.官方中文文档

http://www.redis.cn/

2.linux下安装

  • 官网下载安装包 redis-6.0.6.tar.gz

  • 放到服务器的usr/local/redis目录下

  • 解压:tar -zxvf redis-6.06.tar.gz

  • 在redis目录下进行基本的环境安装:yum install gcc-c++

  • 检查gcc的版本:gcc -v

  • 在redis目录下执行make
    (若报错可能是gcc的版本问题)

  • 执行make install

  • 进入/usr/local/bin目录,这是redis的默认安装目录。在这个目录创建一个myredisconfig的文件夹。然后将/usr/local/redis/redis-6.0.6下的redis.conf拷贝到这个文件夹下
  • 设置为后台自动启动:修改myredisconfig目录下的redis.conf daemonize no
    改为daemonize yes
    ,然后:wq!
    保存退出
  • 通过配置文件启动redis(在/usr/local/bin目录下)

redis-server myredisconfig/redis.conf

  • redis客户端测试连接

    redis-cli -p 6379

    ping

  • 退出redis

shutdown

exit

  • 查看redis进程

ps -ef|grep redis

3.安装出现的问题

  • 在安装redis6以上的版本时make命令会出现错误,原因是gcc版本太低(centos默认版本为4),可以选择安装低版本redis5或者升级gcc到8版本,通过gcc -v命令查看版本.
  • 升级gcc版本的方法

https://www.cnblogs.com/jixiaohua/p/11732225.html

4. 压力测试工具

redis-benchmark是官方自带的性能测试工具,可以有效的测试redis服务的性能。在bin目录下。

测试demo:

测试100个并发连接,10000个请求,检测host为localhost 端口为6379的redis服务器性能。

redis-benchmark -h localhost -p 6379 -c 100 -n 10000

相关参数说明:

5. 基本的数据库命令

  • redis命令手册:https://redis.com.cn/commands.html

  • redis默认有16个数据库 0~15,默认使用第0个。

    查看redis.conf可以得知:

  • 进入redis:redis-server myredisconfig/redis.conf
       redis-cli -p 6379

  • 退出redis:shutdown
        exit

  • 输入密码:auth 311046

  • 切换数据库:select n
       n表示第几个数据库

  • 查看当前数据库的key的数量:dbsize

  • 查看具体的key:key *

  • 清空当前库:flushdb

  • 清空全部的库:flushall

  • 设置key值:set name coderchen

  • 获取key值:get name

  • 查看 key是否存在:exists keyname
    例如  exists name

  • 设置key生存时间当key过期会被自动删除:expire key 秒数

  • 查看还有多少秒过期:ttl name
    -1表示永不过期,-2表示已经过期。

  • 查看key的类型:type key

  • 删除key:  del key

注意:redis命令不区分大小写,但是记录的数据区分大小写

  • 为什么redis默认端口是6379?

https://www.zhihu.com/question/20084750

6. redis单线程快?

  • 为什么redis是单线程?

redis是基于内存的操作,cpu不是redis的瓶颈,redis的瓶颈是机器内存大小或者网络带宽。既然单线程容易实现而且cpu不会成为瓶颈,那么就顺理成章的采用单线程的方案了。redis是C语言编写的,官方数据可以达到100000+QPS(每秒查询次数)

  • redis为什么这么快?

并不是多线程一定比单线程快,效率高。这与cpu,内存,硬盘的速度有关。

redis单线程快是因为redis是基于内存的操作。单线程效率最高,因为多线程的本质就是cpu模拟多个线程的情况,这其中有一个大的开销就是cpu上下文的切换。而redis单线程对内存中的数据进行读写时,速度比较快。举个例子就是读取1MB的数据时,可能多个进程cpu上下文的切换时间就大于单个进程读取数据的时间

key值加引号和不加的区别?

7.字符串String(单值单value)

  • string是redis 的基本类型,一个key对应一个value。string类型是二进制安全的,即redis的string可以包含任何数据,比如jpg图片或者序列化的对象,string类型redis最基本的数据类型,一个redis中字符串value最多可以是512M

常用命令说明:

官方命令手册:https://redis.com.cn/commands.html

string数据结构是简单的key-value类型,value不仅可以是string,也可以是数字

  • append key1 "hello"
    :对不存在的key进行append,等同于set key1 "hello"

  • append key1 ",world"
    :对已经存在的key  append就是在后面追加。

  • strlen key1
    :获取字符串长度

  • set views 0
    :要设置为数字才能用incr,decr命令 ,本质上这是一个字符串操作,因为Redis没有专门的整数类型。存储在 key 中的字符串被转换为十进制有符号整数
  • incr views
    :执行一次就将key中存储 的数字值增一
  • decr views
    :执行一次就将key中存储的数字值减一
  • incrby views 10
    :执行一次key中存储的值+10
  • decrby views 10
    :执行一次key中存储的值-10
  • getrange key2 m n
    :获取[m,n]区间范围内的字符串。n可以为负数,表示偏移,-1 表示最后一个字符, -2 表示倒数第二个字符,以此类推。

  • setrange key2 n value
    :将key2的值从第n个开始替换为指定的value

  • setex key2 10 "hello"
    :将key2的值设置为hello,并且key2的生存时间设置为10s。这个命令等价于set key2 "hello"
      expire key2 10
     但区别是setex 命令是一个原子操作,在同一时间内完成设置值和设置过期时间这两个操作。

  • setnx key3 "redis"
    :在指定的key不存在时,为key设置指定的值,这种情况下等同于set,当key存在时,什么也不做。返回值:0 被设置了      1 没有被设置。

  • mset key1 value1 key2 value2 ......keyn valuen
    :设置多个key和value,如果这其中已经存在key,则会覆盖掉。
  • mget key1 key2 ...keyn
    :返回所有给定的key的值
  • msetnx key1 value1 key2 value2 ...keyn valuen
    :设置多个 key value,当且仅当所有给定的健都不存在时,即使有一个健存在值,msetnx也会拒绝执行操作。
  • getset key value
    :将key的值修改为value,并返回在修改之前的值。如果key在执行这个命令前不存在,则返回nil。
  • 可以用来缓存对象

mset user:1:name zhangsan user:1:age 2

mget user:1:name user:1:age

应用

  • 微博数,粉丝数等

8. 列表List(单值多value)

  • redis的列表是简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)
  • 它的底层就是一个链表

常用命令说明

官方命令手册:https://redis.com.cn/commands.html

  • lpush list "one"
    :将值插入到列表的头部(左边)
  • rpush list "two"
    :将值插入到列表的尾部(右边)
  • lrange list n m
    :返回指定区间的元素  [n,m]  m为-1表示返回n后面所有的元素。
  • lpop list
    :移除并返回列表的第一个元素,左边的第一个
  • rpop list
    :移除并返回列表的最后一个元素,右边第一个。
  • lindex list n
    :按照索引下标n获得元素(-1代表最后一个,0表示第一个,以此类推)

  • llen list
    :返回列表的长度

  • lrem list n value
    :移除列表中与value相等的元素,移除n个,顺序是从前往后。n>0 从头到尾删除值为value的元素, n<0,从尾到头删除值为value的元素, n=0,移除所有值为value的元素

  • ltrim list n m
    :对一个列表进行修剪,只保留区间[n,m]内的元素。其他全部删除。

  • rpoplpush list1 list2
    :移除列表list1的最后一个元素,并将该元素添加到list2的前边。这个操作是原子性的。从6.2.0起被废除了。

  • lset list n value
    :将list中第n个元素的值设置为value

  • linsert list before|after value value1
    :将value1插入到value的前边或者后边。

性能总结

  • 它是一个字符串列表,left,right都可以插入添加
  • 如果键不存在,创建新的链表。
  • 如果已经存在,新增内容
  • 如果值全部移除,对应的键也就消失了
  • 链表的操作无论是头和尾效率都极高,但假如是对中间元素进行操作,效率就比较低。

应用

  • 最新消息排行功能
  • 消息对列

9. 集合Set(单值多value)

  • redis的集合set是string类型的无序集合,是通过HashTable实现的。

常用命令说明

  • sadd key member[...]
    :将一个或者多个成员元素加入到集合中。
  • smembers key
    :返回存储在key中的所有成员
  • sismember key member
    :判断元素member是否是集合key中的成员。返回1表示是,0表示不是。

  • scrad key
    :返回集合中元素的数量。

  • srem key value
    :移除集合key中的一个或者多个元素value

  • srandmember key [count]
    :返回集合key中的随机元素,count表示返回几个元素。

  • spop key [count]
    :从集合key中删除一个或者多个元素,并返回删除的元素。(删除是随机的)

  • smove key1 key2 member
    :将指定的元素从集合key1移动到key2  。这是一个原子操作。

  • sdiff key [key...]
    :返回第一个集合key与其他key之间的差异,即第一个集合key独有的元素。
  • sinter key [key...]
    :返回所有给定集合key的成员交集
  • sunion key [key...]
    :返回所有给定集合key的并集

应用

  • 实现共同关注,共同好友等功能。

10. 哈希Hash(单值多value,v是一个键值对)

  • Hash是一个键值对集合,类似于java中的map,是一个String类型的field和value的映射表,特别适合存储对象。

常用命令

  • hset key field value [field value ....]
    : 为哈希表key的field字段赋值
  • hget key field
    :返回哈希表key中的field字段的值
  • hgetall key
    :返回哈希表key中所有的域和值
  • hdel key field [field...]
    :删除哈希表key中的一个或者多个指定字段。

  • hlen key
    :获取哈希表key中的字段fields的数量

  • hexists key field
    :查看哈希表的指定字段field是否存在。0表示不存在,1表示存在
  • hkeys key
    :返回哈希表key中所有的域field
  • hvals key
    :返回哈希表中所有域的值

  • hincrby key field increment
    : 为哈希表key中的域field的加上增量increment.增量为负数表示进行减法操作。

  • hsetnx key field value
    :为哈希表中不存在的字段field赋值为value。0表示设置失败,1表示设置成功。

应用

  • Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。存储部分变更的数据,如用户信息等。

11. 有序集合Zset

  • 在set基础上,加上一个score。之前set是sadd set1 v1 v2 v3…,现在Zset是zadd zset1 score1 v1 score2 v2。

常用命令

  • zadd key score1 member1[score2 member2...]
    :将一个或者多个member元素及其score值加入到有序集合key中。其中score可以是整数值或者双精度浮点数。可以为正或者负数
  • zrange key n m
    :返回有序集合key中的[n,m]区间内的元素。下标从0开始。如果m超出范围也不会报错,只会返回包含的。

  • zrangebyscore key n m
    :返回有序集合key中的成员,按照score的从小到大排序,范围是[n,m],n,m可以为-inf,+inf,这样在不知道有序集合数量的时候获取所有的成员注意[n,m]这个范围是score的范围,不是成员的下标

  • zrem key member1[member2...]
    :从有序集合key中删除指定的成员member

  • zcard key
    :返回有序集合key中的成员个数

  • zcount key n m
    :返回有序集key中,score在[n,m]之间的成员数量。

  • zrank key member
    :返回有序集key中成员的排名,按照score从低到高排名,从0开始

  • zrevrank key member
    :返回有序集key中成员的排名,按照score从高到低排名,从0开始

应用

  • 排行榜应用,取top n操作。

12. 三种特殊的数据类型

12.1 GEO地理位置

简介:

redis的GEO特性在redis3.2版本中推出,这个功能可以将用户给定的地理位置信息存储起来,并对这些信息进行操作。来实现诸如附近位置,摇一摇这类依赖于地理位置信息的功能GEO的数据类型为Zset。

GEO的数据结构总共有六个常用命令:geoadd ,geopos ,geodist , georadius, georadiusbymember , gethash

官方文档:https://www.redis.net.cn/order/3685.html

命令:

geoadd

将指定的地理空间位置(经度,纬度,名称)添加到指定的key中。这些数据会存储到有序集合Zset中,目的是为了方便使用georadius,或者georadiusbymember命令对数据进行半径查询等操作。

geoadd china:city 121.48 31.40 shanghai 113.88 22.55 shenzhen 120.21 30.20 hangzhou

注意点:该命令采用标准格式的参数xy,所以经度必须在纬度之前。有效的经度 -180~180度,有效的纬度 -85.05~85.05


geopos

从key里面返回所有给定位置元素的位置(经度和纬度)

geopos key member[member...]

geopos china:city shanghai hangzhou


geodist

返回两个给定位置之间的距离,指定的参数unit必须是以下中的一种。m 米 ,km 千米, mi 英里,ft 英尺,默认是m。

geodist key member1 member2 [unit]

geodist china:city shanghai hangzhou km


georadius

以给定的经纬度为中心,找出某一半径内的元素。

范围的单位:m米,km千米 ,mi英里 ,ft英尺

georadius key longitude(经度) latitude(纬度) radius m|km|ft|mi [withcoord] [withdist][withhash][asc|desc][count count]

withdist:返回位置元素的同时,将位置元素与中心之间的距离也一并返回。距离的单位与用户给定范围单位保持一致。

withcoord:将位置元素的经纬度也返回。

withhash:以 52 位有符号整数的形式, 返回位置元素经过原始 geohash 编码的有序集合分值。这个选项主要用于底层应用或者调试, 实际中的作用并不大

asc: 根据中心的位置, 按照从近到远的方式返回位置元素。

desc:根据中心的位置, 按照从远到近的方式返回位置元素。

count:获取前n个匹配的元素,可以减少带宽当数据量大时。

举例:

georadius china:city 120 30 1500 km withdist


georadiusbymember

找出位于指定范围内的元素,中心点是由给定的位置元素决定的

georadiusbymember key member radius m|km|ft|mi [withcoord][withdist][withhash][asc|desc][count count]

举例:

georadiusbymember china:city shanghai 1500 km


geohash

返回一个或者多个位置元素的geohash表示。

redis使用geohash将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似表示距离越近。

geohash key member[member...]

举例:

geohash china:city shanghai hangzhou


zrem

geo没有提供删除成员的方法,但是因为geo的底层是zset,所有可以用zrem命令实现对地理位置信息的删除。

zrem china:city shanghai
:移除元素

zrange china:city 0 -1
:查看所有元素

命令演示

12.2 HyperLogLog

简介

redis在2.8.9版本添加了HyperLogLog结构。

redis HyperLogLog是用来做基数统计的算法。HyperLogLog的优点是在输入元素的数量和体积非常大时,计算基数所需的空间总是固定的,并且很小

基数:比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。基数估计就是在误差可接受的范围内,快速计算基数。

在redis里面,每个HyperLogLog键只需要花费12kb内存,就可以计算接近2^64个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。

HyperLogLog则是一种算法,提供了不精确的去重基数方案。

举个栗子:假如我要统计网页的UV(浏览用户数量,一天内同一个用户多次访问只能算一次),传统的 解决方案是使用Set来保存用户id,然后统计Set中的元素数量来获取页面UV。但这种方案只能承载少量 用户,一旦用户数量大起来就需要消耗大量的空间来存储用户id。我的目的是统计用户数量而不是保存 用户,这简直是个吃力不讨好的方案!而使用Redis的HyperLogLog最多需要12k就可以统计大量的用户 数,尽管它大概有0.81%的错误率,但对于统计UV这种不需要很精确的数据是可以忽略不计的

基本命令

pfadd key element[element...]
:添加指定元素到HyperLogLog

pfcount key[key...]
:返回给定HyperLogLog的基数估计值。

pfmerge destkey sourcekey [sourcekey...]
:将多个HyperLogLog合并为一个HyperLogLog,并集计算。

12.3 BitMap

简介

    在开发中,可能会遇到这种情况:需要统计用户的某些信息,如活跃或不活跃,登录或者不登录;又如 需要记录用户一年的打卡情况,打卡了是1, 没有打卡是0,如果使用普通的 key/value存储,则要记录 365条记录,如果用户量很大,需要的空间也会很大,所以 Redis 提供了 Bitmap 位图这种数据结构, Bitmap 就是通过操作二进制位来进行记录,即为 0 和 1;如果要记录 365 天的打卡情况,使用 Bitmap 表示的形式大概如下:0101000111000111...........................,这样有什么好处呢?当然就是节约内存 了,365 天相当于 365 bit,又 1 字节 = 8 bit , 所以相当于使用 46 个字节即可。

BitMap 就是通过一个 bit 位来表示某个元素对应的值或者状态, 其中的 key 就是对应元素本身,实际上 底层也是通过对字符串的操作来实现。Redis 从 2.2 版本之后新增了setbit, getbit, bitcount 等几个 bitmap 相关命令。

基本命令
  • setbit key n value
    :设置key的第n位的值value是0或1,n是从0开始的。
  • getbit key n
    :获取第n位的值。如果没有设置返回0
  • bitcount key[start,end]
    :统计key上值为1的个数。

13. redis事务

理论


redis事务的概念:

redis事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有的命令都会被序列化。在事务执行过程中,会按照顺序串行化执行对列中 的命令,其他客户端提交的命令请求不会插入到事务执行的命令序列中。

总结的说:redis事务就是一次性,顺序性,排他性的执行一个对列中的一系列命令。


redis事务没有隔离级别的概念:

批量操作在发送exec命令前被放入对列缓存,并不会被实际执行。


redis不保证原子性:

redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。


redis事务的三个阶段:

开始事务

命令入队

执行事务


redis事务的相关命令

  • watch key1 key2...
    :监视一个或者多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断(类似乐观锁)
  • unwatch
    :取消watch对所有key的监控。
  • multi
    :标记一个事务块的开始(queued)
  • exec
    :执行所有事务块的命令(一旦执行exec后,之前加的监控锁都会被取消掉)
  • discard
    :取消事务,放弃事务块中所有 的命令

乐观锁和悲观锁?

  • 悲观锁(Pessimistic Lock),顾名思义就是很悲观,每次拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿到这个数据就会block知道它拿到锁。传统的关系型数据库里面就用到了很多这种锁机制,比如行锁,表锁等,读锁写锁等,都是在操作之前先上锁。
  • 乐观锁(Optimistic Lock)顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读的应用类型,这样可以提高吞吐量,乐观锁策略:提交版本必须大于记录当前版本才能执行更新。

实践

  • 正常执行

  • 放弃事务

  • 如果事务对列中存在命令性错误(类似java编译性错误),执行exec命令时,所有命令都不会执行。

  • 如果在事务对列中存在语法性错误(类似于java的1/0运行时异常),则执行exec命令时,其他正确命令会被执行,错误命令抛出异常。

  • watch命令的演示(watch 用来监控key在事务执行的前后是否变化)–事务成功执行

  • 使用watch事务执行失败

    由于在事务执行过程中,watch监视的值出现了变化,因此导致了错误,这时要放弃监视,然后重来。

    **注意:**一旦exec执行事务后,无论事务执行成功还是失败,watch对变量的监控都被取消。因此当事务执行失败后,需要重新执行watch对变量进行监控,并开启新的事务进行操作。

    watch指令类似于乐观锁,事务提交时,如果watch监控的多个key中任何key的值被其他客户端更改,则使用exec执行事务时,事务对列不会被执行,同时返回Nullmulti-bulk 通知调用者事务执行失败。

14. springboot整合redis

步骤:

  • 导入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

  • 在application.properties中配置
  spring.redis.host=115.29.232.195
spring.redis.port=6379
spring.redis.database=0
spring.redis.timeout=50000

  • 测试
  @SpringBootTest
  class SpringbootRedisApplicationTests {
      @Autowired
      private RedisTemplate redisTemplate;
  
      @Test
      void contextLoads() {
          /*
          * opsForList:操作list,类似string
          * opsForGeo 操作geo
          * opsForSet
          * .......
          * 和redis命令行一样。
          * */

         /*获取redis的连接对象*/
  //        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
  //        connection.flushDb();
  //        connection.flushAll();
  
          redisTemplate.opsForValue().set("mykey","kuangshen");
          System.out.println(redisTemplate.opsForValue().get("mykey"));
          redisTemplate.opsForSet().add("k5","v9");
          Set k5 = redisTemplate.opsForSet().members("k5");
          System.out.println(k5);
  
      }
  }
  

redis要做的配置

  • 阿里云设置redis的安全组:端口6379

  • 修改redis.conf

    (在/usr/local/bin/myredisconfig/redis.conf)

    bind 127.0.0.1 改为 bind 0.0.0.0(注释掉也行

    protected-mode yes 改为 protected-mode no:  (即该配置项表示是否开启保护模式,默认是开启,开启后Redis只会本地进行访问,拒绝外部访问)

    daemonize no 改为 daemonize yes

    注意:修改后要重新登录redis


  • 关于防火墙的设置

    rpm -qa|grep firewalld;rpm -qa|grep firewall-config
    :检查系统是否安装了firewalld和firewall-config.CentOS中系统默认安装firewalld, firewall-config要自己安装

    yum -y update firewalld
    :将firewalld更新为最新版

    yum -y install firewall-config
    :安装firewall-config

    systemctl start firewalld
    :启动firewalld服务

    systemctl status firewalld
    :查看firewalld的状态

    systemctl stop firewalld
    :停止firewalld服务

    systemctl enable firewalld
    :恢复开机自动启动firewalld服务

    推荐参考文章:CentOS7中firewalld的安装与使用详解 https://blog.csdn.net/solaraceboy/article/details/78342360


  • 检查阿里云的防火墙是否开放端口号

    firewall-cmd --query-port=6379/tcp
     如果是 yes,则就是开放的。

    如果是no

    则永久开放6379端口:firewall-cmd --zone=public --add-port=6379/tcp --permanent

    重新加载:firewall-cmd --reload

    然后再查看端口是否开启:firewall-cmd --query-port=6379/tcp

  • 查看redis服务进程

     ps -ef | grep redis

  • 如果上面的方法都试过了但还是不行,试试重启服务器

推荐一个好用的redis客户端工具

Another Redis Desktop Manager

下载地址:https://github.com/qishibo/AnotherRedisDesktopManager、

可以用它来测试和管理redis

redis的序列化配置

  • 通过源码可知,springboot自动帮我们在容器中生成了一个redisTemplate和一个StringRedisTemplate,但是这个redisTemplate的泛型是<ooject,object>,写代码不方便,需要类型转换的代码,并且没有设置数据存在redis时,key及value的序列化方式。因此要自定义一个配置类。
  • 为什么要序列化:https://www.jianshu.com/p/cc5a29b06b3d
package com.kuang.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.net.UnknownHostException;

/**
 * Created with Intellij IDEA
 * Description:
 * user: CoderChen
 * Date: 2021-06-06
 * Time: 14:05
 */

@Configuration

public class redisConfig {
    /*编写自己的redisTemplate----固定模板*/
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        /*为了自己开发方便,一般直接使用<String,object>*/
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        /*json序列化配置*/
        Jackson2JsonRedisSerializer Jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        Jackson2JsonRedisSerializer.setObjectMapper(om);
        /*string序列化配置*/
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();


        /*配置具体的序列化方式*/
        /*key采用string的序列化方式*/
        template.setKeySerializer(stringRedisSerializer);
        /*hash的key采用string的序列化方式*/
        template.setHashKeySerializer(stringRedisSerializer);
        /*value序列化方式采用jackson*/
        template.setValueSerializer(Jackson2JsonRedisSerializer);
        /*hash的value序列化方式采用jackson*/
        template.setHashValueSerializer(Jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}


redis的工具类

  • 直接使用redisTemplate操作redis时,需要很多代码,因此直接封装一个redisUtils,这样写代码方便一点。这个redisUtils交给spring容器实例化,使用时直接注解注入。
package com.kuang.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
 * Created with Intellij IDEA
 * Description:
 * user: CoderChen
 * Date: 2021-06-06
 * Time: 14:52
 */

/*在真实开发中,经常使用*/
@Component
public final class RedisUtil {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    // =============================common============================

    /**
     * 指定缓存失效时间
     *
     * @param key  键
     * @param time 时间(秒)
     */

    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     *
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */

    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     *
     * @param key 键
     * @return true 存在 false不存在
     */

    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     *
     * @param key 可以传一个值 或多个
     */

    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }
// ============================String=============================

    /**
     * 普通缓存获取
     *
     * @param key 键
     * @return 值
     */

    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     *
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 普通缓存放入并设置时间
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time,
                        TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 递增
     *
     * @param key   键
     * @param delta 要增加几(大于0)
     */

    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     *
     * @param key   键
     * @param delta 要减少几(小于0)
     */

    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }
// ================================Map=================================

    /**
     * HashGet
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     */

    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     *
     * @param key 键
     * @return 对应的多个键值
     */

    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     *
     * @param key 键
     * @param map 对应多个键值
     */

    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     *
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */

    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */

    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */

    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */

    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }

    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */

    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */

    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */

    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }
// ============================set=============================

    /**
     * 根据key获取Set中的所有值
     *
     * @param key 键
     */

    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */

    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */

    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */

    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */

    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
// ===============================list=================================

    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     */

    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */

    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0
     *              时,-1,表尾,-2倒数第二个元素,依次类推
     */

    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */

    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */

    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */

    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */

    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 移除N个值为value
     *
     * @param key 键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */

        public long lRemove (String key,long count, Object value){
            try {
                Long remove = redisTemplate.opsForList().remove(key, count,value);
                return remove;
            } catch (Exception e) {
                e.printStackTrace();
                return 0;
            }
        }
    }



测试代码

package com.kuang;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kuang.pojo.User;
import com.kuang.utils.RedisUtil;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Set;

@SpringBootTest
class SpringbootRedisApplicationTests {
    @Autowired
    @Qualifier("redisTemplate")/*避免与源码重合,跳转到自定义redisTempalte*/
    private RedisTemplate redisTemplate;

    @Autowired
    private RedisUtil redisUtil;


    @Test
    void contextLoads() {
        /*
        * opsForList:操作list,类似string
        * opsForGeo 操作geo
        * opsForSet
        * .......
        * 和redis命令行一样。
        * */
       /*获取redis的连接对象*/
//        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
//        connection.flushDb();
//        connection.flushAll();

        redisTemplate.opsForValue().set("mykey","kuangshen");
        System.out.println(redisTemplate.opsForValue().get("mykey"));
        redisTemplate.opsForSet().add("k5","v9");
        Set k5 = redisTemplate.opsForSet().members("k5");
        System.out.println(k5);



    }

    @Test
    public void test() throws JsonProcessingException {
        /*真实开发一般使用json传递数据*/
        User user = new User("kuangshen", 3);
//        String jsonUser = new ObjectMapper().writeValueAsString(user);
//        redisTemplate.opsForValue().set("user", jsonUser);
        redisTemplate.opsForValue().set("user", user);
        System.out.println(redisTemplate.opsForValue().get("user"));

    }

    @Test
    public void test1() {
        redisUtil.set("username""coderchen");
        System.out.println(redisUtil.get("username"));
    }

}


15. conf配置文件分析

熟悉基本配置

  • redis的配置文件的位置是/usr/local/redis/redis-6.06  redis.conf,但我们平时将它拷贝到/usr/local/bin/myredisconfig  redis.conf 来进行操作和修改配置文件。确保初始文件的安全。
  • 在redis中查看基本的配置

    config get *
    :获取全部的配置,可以查看配置是否修改成功。

  • Units单位

    配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit

    对大小写不敏感


  • includes包含

    和spring配置文件相似,可以通过includes包含,redis.conf作为总文件,可以包含其他文件。(读一下前面的英文说明即知)


  • network网络配置

    bind 0.0.0.0 :绑定的主机地址

    protected-mode no:保护模式,默认为yes, 默认是开启,开启后Redis只会本地进行访问,拒绝外部访问.需要设置为yes

    port 6379 :默认端口


  • general通用

    daemonize yes:默认情况下,redis不作为守护进程运行,需要开启的话,改为yes

    supervised no:可以通过upstart和systemd管理redis守护进程。

    pidfile var/run/redis_6379.pid:以后台进程的方式运行redis,则需要指定pid文件。

    loglevel notice:日志级别。可选项有:

    debug:记录大量日志信息,适用于开发,测试阶段

    verbose:较多日志信息

    notice:适量日志信息,适用于生产环境

    warning:仅有部分重要,关键信息才会被记录

    logfile "":日志文件的位置,当指定为空字符串时,为标准输出。

    databases 16:设置数据库的数目,默认数据库时DB 0

    always-show-logo yes:是否总是显示logo


  • snapshopting快照

    save 900 1:900秒(15分钟)内至少一个key值改变(则进行数据库保存–持久化) save 300 10:300秒(5分钟内)至少10个key值改变(则进行数据库保存–持久化 save 60 10000:60秒(1分钟)内至少10000个key值改变(则进行数据库保存--持久化)

    stop-writes-on-bgsave-error yes:持久化出现错误后是否依然进行工作

    rdbcompression yes:使用压缩rdb文件  yes:压缩 但是需要一些cpu 的消耗  no:不压缩,需要更多的磁盘空间。

    rdbchecksum yes:是否校验rdb文件,更有利于文件的容错性,但是在保存rdb文件的时 候,会有大概10%的性能损耗

    dbfilename dump.rdb:dbfilenamerdb文件名称

    dir ./:dir 数据目录,数据库的写入会在这个目录。rdb、aof文件也会写在这个目录


  • security安全

    获得和设置密码:(需要先进入redis)

    config get requirepass

    config set requirepass "123456"

    然后每次登录redis时,都要使用auth验证:

    auth password

    注意:如果服务器使用redis不设置密码,信息可能会收到攻击导致丢失。

    在conf中设置密码,将requirepass 这个注释去掉,后面的字段改成对应的密码。

  • clients客户端

    maxclients 10000:设置能连上redis的最大客户端连接数量

  • memory management 内存管理

    maxmemory:redis配置的最大内存容量

    maxmemory-policy noeviction:maxmemory-policy 内存达到上限的处理策略

    volatile-lru:利用LRU算法移除设置过过期时间的key。

    volatile-random:随机移除设置过过期时间的key。

    volatile-ttl:移除即将过期的key,根据最近过期时间来删除(辅以TTL)

    allkeys-lru:利用LRU算法移除任何key。

    allkeys-random:随机移除任何key。

    noeviction:不移除任何key,只是返回一个写错误

  • append only mode 只追加模式

    这部分内容的详细配置说明见redis的持久化

常见的配置介绍

  1. redis默认不是以守护进程的方式进行,可以通过该配置项修改,使用yes启动守护进程

daemonize no —> daemonize yes

  1. 当Redis以守护进程方式运行时,Redis默认会把pid写入/var/run/redis.pid文件,可以通过pidfile指 定

pidfile var/run/redis.pid

  1. 指定Redis监听端口,默认端口为6379,作者在自己的一篇博文中解释了为什么选用6379作为默认 端口,因为6379在手机按键上MERZ对应的号码,而MERZ取自意大利歌女Alessia Merz的名字

port 6379

  1. 绑定的主机地址

bind 127.0.0.1

  1. 当 客户端闲置多长时间后关闭连接,如果指定为0,表示关闭该功能

timeout 300

  1. 指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为 verbose

loglevel verbose

  1. 日志记录方式,默认为标准输出,如果配置Redis为守护进程方式运行,而这里又配置为日志记录方 式为标准输出,则日志将会发送给/dev/null

logfile stdout

  1. 设置数据库的数量,默认数据库为0,可以使用SELECT 命令在连接上指定数据库id

databases 16

  1. 指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合

save

Redis默认配置文件中提供了三个条件:

save 900 1

save 300 10

save 60 10000

分别表示900秒(15分钟)内有1个更改,300秒(5分钟)内有10个更改以及60秒内有10000个更 改

  1. 指定存储至本地数据库时是否压缩数据,默认为yes,Redis采用LZF压缩,如果为了节省CPU时 间,可以关闭该选项,但会导致数据库文件变的巨大

rdbcompression yes

  1. 指定本地数据库文件名,默认值为dump.rdb

dbfilename dump.rdb

  1. 指定本地数据库存放目录

dir ./

  1. 设置当本机为slav服务时,设置master服务的IP地址及端口,在Redis启动时,它会自动从master 进行数据同步

slaveof

  1. 当master服务设置了密码保护时,slav服务连接master的密码

masterauth

  1. 设置Redis连接密码,如果配置了连接密码,客户端在连接Redis时需要通过AUTH 命令提供密码, 默认关闭

requirepass foobared

  1. 设置同一时间最大客户端连接数,默认无限制,Redis可以同时打开的客户端连接数为Redis进程可 以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时, Redis会关闭新的连接并向客户端返回max number of clients reached错误信息

maxclients 128

  1. 指定Redis最大内存限制,Redis在启动时会把数据加载到内存中,达到最大内存后,Redis会先尝 试清除已到期或即将到期的Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作, 但仍然可以进行读取操作。Redis新的vm机制,会把Key存放内存,Value会存放在swap区

maxmemory

  1. 指定是否在每次更新操作后进行日志记录,Redis在默认情况下是异步的把数据写入磁盘,如果不 开启,可能会在断电时导致一段时间内的数据丢失。因为 redis本身同步数据文件是按上面save条件来 同步的,所以有的数据会在一段时间内只存在于内存中。默认为no

appendonly no

  1. 指定更新日志文件名,默认为appendonly.aof

appendfilename appendonly.aof

  1. 指定更新日志条件,共有3个可选值:

no:表示等操作系统进行数据缓存同步到磁盘(快)

always:表示每次更新操作后手动调用fsync()将数据写到磁盘(慢,安全)

everysec:表示每秒同步一次(折衷,默认值)

appendfsync everysec

  1. 指定是否启用虚拟内存机制,默认值为no,简单的介绍一下,VM机制将数据分页存放,由Redis将 访问量较少的页即冷数据swap到磁盘上,访问多的页面由磁盘自动换出到内存中

vm-enabled no

  1. 虚拟内存文件路径,默认值为/tmp/redis.swap,不可多个Redis实例共享

vm-swap-file tmp/redis.swap

  1. 将所有大于vm-max-memory的数据存入虚拟内存,无论vm-max-memory设置多小,所有索引数据 都是内存存储的(Redis的索引数据 就是keys),也就是说,当vm-max-memory设置为0的时候,其实是所有 value都存在于磁盘。默认值为0

vm-max-memory 0

  1. Redis swap文件分成了很多的page,一个对象可以保存在多个page上面,但一个page上不能被多 个对象共享,vm-page-size是要根据存储的 数据大小来设定的,作者建议如果存储很多小对象,page 大小最好设置为32或者64bytes;如果存储很大大对象,则可以使用更大的page,如果不 确定,就使用 默认值

vm-page-size 32

  1. 设置swap文件中的page数量,由于页表(一种表示页面空闲或使用的bitmap)是在放在内存中 的,,在磁盘上每8个pages将消耗1byte的内存

vm-pages 134217728

  1. 设置访问swap文件的线程数,最好不要超过机器的核数,如果设置为0,那么所有对swap文件的操作都 是串行的,可能会造成比较长时间的延迟。默认值为4

vm-max-threads 4

  1. 设置在向客户端应答时,是否把较小的包合并为一个包发送,默认为开启

glueoutputbuf yes

  1. 指定在超过一定的数量或者最大的元素超过某一临界值时,采用一种特殊的哈希算法

hash-max-zipmap-entries 64

hash-max-zipmap-value 512

  1. 指定是否激活重置哈希,默认为开启

activerehashing yes

  1. 指定包含其它的配置文件,可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各 个实例又拥有自己的特定配置文件

include path/to/local.conf

16. redis的持久化

为什么要持久化

  • redis是内存数据库,数据存储在内存中,这样便会存在一个问题:易丢失。当服务器关机或者redis服务器守护进程退出时,内存中的数据就会丢失。
  • 如果redis用作缓存,则数据丢失问题不大,重新从数据源加载进来就行。如果redis用作数据库,则丢失数据会造成很大的危害。
  • 因此为了保存redis中的数据,就要使用持久化技术来将数据持久化保存到硬盘上。
  • 持久化技术有两种:RDB 和 AOF

RDB

什么是RDB

  • RDB是一种快照存储持久化技术。具体就是将redis数据库指定时刻 的内存数据保存到硬盘的文件中,默认保存的文件名是dump.rdb,而在redis服务器启动时,可以通过加载dump.rdb文件来将内存数据恢复到redis中。

开启RDB持久化的方式(默认开启)

  • 有三种方式,save命令,bigsave命令,服务器配置自动触发。

  • save命令:

    同步操作。当客户端向服务器发送save命令持久化时,服务器会阻塞save命令后其他客户端的请求,直到数据同步完成。

    这便会产生一个问题:如果数据量大,同步时间长,会影响服务器执行其他请求。所以不建议使用这个命令。

# 在redis命令行执行save即可完成持久化
127.0.0.1:6379> save
OK


  • bgsave:

    异步操作。当客户端向服务器发送bgsave命令时,redis服务器主进程会forks一个子进程来解决数据持久化,主进程仍然可以接收其他请求。在子进程将数据保存到rdb文件之后,子进程便会退出。

# 异步保存数据到磁盘
127.0.0.1:6379> bgsave
Background saving started

  • 服务器配置自动触发:

除了通过客户端发送命令外,还有一种方式,就是在redis.conf配置文件中SNAPSHOTTING
下的save
来指定触发持久化的条件。

  # 900s内至少达到一条写命令
  save 900 1
  # 300s内至少达至10条写命令
  save 300 10
  # 60s内至少达到10000条写命令
  save 60 10000  

修改之后重新加载配置文件。

这种方式与bgsave命令类似,达到触发条件时,会fork一个子进程来进行数据同步。

对redis.conf配置文件中SNAPSHOTTING部分的说明

# rdb的触发条件,满足条件就会执行rdb
# 如果要禁用rdb的持久化,则将下面的save注释或者设置一个空字符串
save 900 1 # 900s内修改一次
save 300 10 # 300s内修改10次
save 60 10000 # 60s内修改10000次

#如果配置为no,表示你不在乎数据不一致或者有其他的手段发现和控制,默认为yes。
stop-writes-on-bgsave-error yes

#对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩,如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能
rdbcompression yes

# 在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能。默认为yes。
rdbchecksum yes

# 文件的名字
dbfilename dump.rdb

# 持久化后文件的位置
dir /www/server/redis/


RDB方式的数据恢复

  • 获取备份文件的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/www/server/redis"
127.0.0.1:6379> 

  • 将备份文件移动到redis的/usr/local/bin目录下
  • 启动redis服务即可

RDB的优缺点

优点:

  • 与AOF方式相比,通过rdb文件恢复数据比较快。
  • rdb文件非常紧凑,适合于数据备份。
  • 通过RDB进行数据备,由于使用子进程生成,所以对Redis服务器性能影响较小。

缺点:

  • 如果服务器宕机的话,采用RDB的方式会造成某个时段内数据的丢失,比如我们设置10分钟同步一次或5分钟达到1000次写入就同步一次,那么如果还没达到触发条件服务器就死机了,那么这个时间段的数据会丢失。

  • 使用save命令会造成服务器阻塞,直接数据同步完成才能接收后续请求。

  • 使用bgsave命令在forks子进程时,如果数据量太大,forks的过程也会发生阻塞,另外,forks子进程会耗费内存。


AOF

什么是AOF

  • redis每执行一条写操作命令,就将该命令以追加的方式写入到一个日志文件里,然后重启的时候先读取这个文件里的命令,然后执行它。这种保存写操作命令到日志的持久化方式,就叫AOF。
  • 注意只有写操作命令,读操作命令不会进入日志,因为对恢复数据没帮助。

开启AOF的方式(默认关闭)

# redis.conf
############################## APPEND ONLY MODE ###############################
appendonly no # 默认为No,如果要开启持久化改为yes 

appendfilename "appendonly.aof"  # AOF持久化文件的名字

当开启后在命令行输入:

127.0.0.1:6379> set name coderchen
OK

会发现在/www/server/redis/下面的appendonly.aof文件里多了几行日志:

*3  # 表示当前命令有三个部分,每部分以$+数字开启
$3 # 表示这部分有三个字节
set
$4  # 表示这部分有四个字节
name
$9  # 表示这部分有九个字节
coderchen

AOF的数据恢复

正常恢复:

启动:设置Yes,修改默认的appendonly no,改为yes 
将有数据的aof文件复制一份保存到对应目录(config get dir)
恢复:重启redis然后重新加载

异常恢复:

 启动:设置Yes 
 故意破坏 appendonly.aof 文件! 
 修复:redis-check-aof --fix appendonly.aof 进行修复 
 恢复:重启 redis 然后重新加

appendonly.aof中配置的说明

############################## APPEND ONLY MODE ###############################
appendonly no # 开启AOF机制

appendfilename "appendonly.aof" #AOF的文件名

# appendfsync always
appendfsync everysec # 写入策略,可以是always,everysec,no。
# appendfsync no

no-appendfsync-on-rewrite no # 默认不重写aof文件

auto-aof-rewrite-percentage 100 # 设置重写的基准值
auto-aof-rewrite-min-size 64mb # 设置重写的基准值

aof-load-truncated yes # 当AOF文件末尾截断时,加载文件还是报错退出。yes:被截断的aof文件被加载,并打印日志通知用户。no:服务器报错并拒绝启动。

aof-use-rdb-preamble yes #开启混合持久化,更快的AOF重写和启动时数据恢复

dir /www/server/redis/ # aof文件的位置,这里要注意rdb和aof存放的位置一样,所以在APPEND ONLY MODE下面找不到dir,在SANPSHOTTING里

三种写入策略

  • 在上面的配置文件中,我们可以通过appendfsync选项指定写入策略,有三个选项。
# appendfsync always
appendfsync everysec # 写入策略,可以是always,everysec,no。
# appendfsync no

always:

客户端的每一个写操作都保存到aof文件中,这种策略很安全,但是每个写请求都要io操作,所以很慢。

everysec:

appendfsync的默认写入策略,每秒写入一次aof文件,因此,最多可能会丢失1s数据。

no:

redis服务器不负责写入aof,而是交由操作系统来处理什么时候写入aof文件。更快,但是也最不安全。

三种方式优缺点总结:

图片

AOF重写机制

  • AOF将客户端的所有写操作都追加到aof文件的末尾,但是如果进行对一个key的不断修改时,往往最后一个修改值才是有效的,但aof文件记录了每一次修改,最后恢复的时候按顺序恢复,这样不仅aof文件变得太大,而且恢复数据时速度变慢。
  • 因此引入了aof重写机制,通过重写aof可以生成恢复当前数据的最少命令集。

开启重写的方法:

no-appendfsync-on-rewrite no # 默认不重写aof文件 改为yes

好处:

  • 压缩aof文件,减少磁盘占用量。
  • 将aof的命令压缩为最小命令集,加快了数据恢复的速度。

AOF和RDB的比较

图片

17 redis 发布订阅

是什么

  • redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。redis客户端可以订阅任意数量的频道。
  • 下图展示了频道channel1,以及订阅这个频道的三个客户端—-client2,client5和client1之间的关系。
  • 当有新消息通过publish命令发送给频道channel1时,这个消息就会被发送给订阅它的三个用户端。

基本命令

  • 这些命令被广泛用于构建即时通信应用,比如网络聊天室和实时广播,实时提醒。

测试

Redis 中提供了一组命令,可以用于发布消息订阅频道取消订阅以及按照模式订阅

img

窗口1

127.0.0.1:6379> publish redischat "hello,redis"  #在redischat频道发布消息hello,redis
(integer) 0 #0表示当前订阅者为0个,1表示为1个
127.0.0.1:6379> publish redischat "hello,coderchen"
(integer) 1
127.0.0.1:6379> publish redischat "hello,redis"
(integer) 1
127.0.0.1:6379> 


窗口2

127.0.0.1:6379> subscribe redischat # 订阅频道redischat,可以接收redischat频道的消息
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redischat"
3) (integer) 1
1) "message"
2) "redischat"
3) "hello,coderchen"
1) "message"
2) "redischat"
3) "hello,redis"

原理

  • redis是使用c实现的,通过分析redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,籍此加深对redis的理解。
  • Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能
  • 通过 SUBSCRIBE 命令订阅某频道后,redis-server 里维护了一个字典,字典的键就是一个个 channel ,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE 命令的关 键,就是将客户端添加到给定 channel 的订阅链表中。
  • 通过 PUBLISH 命令向订阅者发送消息,redis-server 会使用给定的频道作为键,在它所维护的 channel 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
  • Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个 key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应 的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
  • 使用场景:Pub/Sub构建实时消息系统             Redis的Pub/Sub系统可以构建实时的消息系统 比如很多用Pub/Sub构建的实时聊天系统的例子。
  • 一篇很好的文章:Redis 发布订阅,小功能大用处,真没那么废材!   https://www.cnblogs.com/goodAndyxublog/p/13716071.html

18 redis主从复制

概念

  • 主从复制是指将一台redis服务器的数据复制到其他的redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower)。

  • 数据的复制是单向的,只能由主节点到从节点。master以写为主,slave以读为主。

  • 默认情况下每台redis服务器都是主节点,且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

  • 主从复制的主要作用包括:

  1.数据冗余: 主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
  
  2.故障恢复: 当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复,实际上是一种服务的冗余。
  
  3.负载均衡: 在主从复制的基础上,配合读写分离可以由主节点提供写服务,由从节点提供读服务(即写redis数据时应用连接主节点,读redis数据时应用连接从节点),分担服务器负载。尤其是在写少读多的场景下,通过多个从节分担读负载,可以大大提高redis服务器的并发量。
  
  4.高可用基石: 除了上述作用以外,主从复制还是哨兵和集群能够实现的基础,因此说主从复制是redis高可用的基础。

  • 一般来说,要将redis运用于工程项目中,只使用一台redis是万万不能的,原因如下:
  1.从结构上:单个redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载。压力较大。
  2.从容量上,单个redis服务器内存容量有限,就算一台redis服务器内存容量为256g,也不能将所有内存用作redis存储内存,一般来说,单台redis最大使用内存不应该超过20G.

  • 电商网站上的商品,一般都是一次上传,无数次浏览的,也就是“多读少写”。对于这种场景,可以使用如下架构:

环境配置(伪集群)

  • 这部分参考狂神的pdf,不写了。

19 哨兵模式

概述

  • 主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费时费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。redis从2.8开始正式提供了Sentinel(哨兵)架构来解决这个问题。
  • 它能够自动后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。
  • 哨兵模式是一种特殊的模式,首先redis提供了哨兵的命令,哨兵是一个独立的进程,它会独立运行。其原理是哨兵通过发送命令,等待redis服务器响应,从而监控多个redis实例。

哨兵的作用:

  • 通过发送命令,让redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵检测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让他们切换主机。

多哨兵模式

  • 一个哨兵进程对redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
  • 假设主服务器宕机,哨兵1会检测到这个结果,系统并不会马上进行failover 过程,仅仅是哨兵 1主观的认为主服务器不可用,这个现象称为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值是,那么哨兵之间会进行一次投票,投票的结果由一个哨兵发起,进行failover(故障转移)操作,切换成功后,会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

哨兵模式的优缺点:

优点:

  • 哨兵集群模式是基于主从模式的,所有主从的优点,哨兵模式同样具有。
  • 主从可以切换,故障可以转移,系统可用性更好。
  • 哨兵模式是主从模式的升级,系统更加健壮,可用性更高。

缺点:

  • redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。
  • 实现哨兵模式的配置也不简单,甚至可以说有些繁琐。

哨兵配置说明:

  • 在/www/server/redis下有一个配置文件sentinel.conf。
port 26379 #哨兵sentinel实例运行的端口 默认26379
daemonize no

pidfile /var/run/redis-sentinel.pid

logfile ""

dir /tmp #哨兵模式工作目录

------------------------------------------------------------------
# 哨兵sentinel监控的redis主节点的 ip port
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了
sentinel monitor mymaster 127.0.0.1 6379 2
----------------------------------------------------------------
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
----------------------------------------------------------------
# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000

acllog-max-len 128

# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步这个数字越小,完成failover所需的时间就越长,
#但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
#可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1
--------------------------------------------------------------------
# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000


sentinel deny-scripts-reconfig yes
SENTINEL resolve-hostnames no
SENTINEL announce-hostnames no

推荐文章

你管这破玩意叫哨兵?https://mp.weixin.qq.com/s/6qhK1oHXP_VzfgR9BjYVJg

20 缓存雪崩,击穿和穿透

  • 我们使用的传统的数据库如mysql,它的数据存储在硬盘上,读写起来速度比较慢,不支持大量的请求访问。而redis是内存数据库,数据在内存上,读写速度是硬盘的好多倍,因此我们引入了redis做缓存,来减少对mysql的直接访问,提高系统性能。
  • 引入缓存后便会有缓存异常的三个问题:缓存雪崩,缓存击穿,缓存穿透。
图片

缓存雪崩

  • 缓存雪崩是指redis中的大量缓存过期或者redis故障,导致大量的用户请求无法在redis中处理,全部请求直接访问数据库从而导致数据库压力骤增,严重会造成数据库宕机,从而导致一系列连锁反应,造成整个系统崩溃。
图片

大量缓存过期的处理策略:

  • 均匀设置过期时间,避免同一时间过期
  • 互斥锁,保证同一时间只有一个应用在构建缓存。
  • 双Key策略,主key设置过期内容,备key永久,主Key过期时,返回备key的内容。
  • 后台更新缓存,定时更新,消息对列通知更新。

redis故障的处理策略

  • 服务熔断或者请求限流机制
  • 构建redis缓存高可靠集群

缓存击穿

  • 缓存中频繁被访问的数据称为热点数据
  • 如果缓存中某个热点数据过期了,这时大量的请求访问该热点数据,就无法从缓存中读取,只能直接访问数据库,数据库很容易就被高并发的请求冲垮。
图片

注意:缓存击穿和缓存雪崩很相似,都是在redis缓存中获取不到数据,进而只能访问数据库。可以认为缓存击穿是缓存雪崩的一个子集。

处理策略:

  • 互斥锁
  • 不给热点数据设置过期时间,由后台更新缓存。

缓存穿透

  • 缓存穿透是指用户请求的数据,既不在缓存中,也不在数据库中。导致请求在访问缓存时,发现缓存缺失,再去访问数据库时,发现数据库也没有要访问的数据,没办法来构建缓存来服务后续的请求。那么有大量这样的请求时,会给数据库造成很大的压力。
图片

处理策略:

  • 非法请求的限制:判断请求是否合理,如果不合理就直接返回错误。
  • 缓存空值或者默认值:针对一些请求在缓存中设置默认值或者空值,使其不在访问数据库。
  • 使用布隆过滤器快速判断数据是否存在,避免通过查询数据库来判断数据是否存在。

布隆过滤器:

简单的说就是在redis或者数据库前设置一道过滤程序,来拦截所有要访问redis或者数据库的请求,先判断请求的数据在redis或者数据库中是否存在,如果存在就放行。这样可以避免对数据库的大量访问。

参考的大佬的文章:

小林coding的 《再也不怕,缓存雪崩、击穿、穿透!》(写的很详细!)

https://mp.weixin.qq.com/s/_StOUX9Nu-Bo8UpX7ThZmg

21. NoSQL概述

为什么要用NoSQL

  • 随着数据量,访问量的增大,传统mysql数据库出现性能问题,这时引入缓存技术来缓解数据库的压力,优化数据库结构和索引。但随着数据量访问量进一步增大,mysql主从读写分离,mysql集群等都无法满足要求。mysql数据库扩展性达到瓶颈。
  • 这时传统的关系型数据库难以应付高并发,暴露出很多问题,而非关系型数据库是为了解决当前高并发问题和大规模数据等问题而诞生的。因此非关系型数据库非常适用当前的环境。

什么是NoSQL

NoSQL

NoSQL=Not Only SQL 意思是不仅仅是SQL

NoSQL的特点

  1. 易扩展
  2. 大数据量高性能
有非常高的读写性能。特别是在大数据量下。

  1. 多样灵活的数据模型
NoSQL无需事先为要存储的数据建立字段,随时可以存储自定义的数据格式。而在关系数据库里,增删字段是一件非常麻烦的事。

  1. 传统的RDBMS和NoSQL的比较
传统的关系型数据库 RDBMS
- 高度组织化结构化数据
- 结构化查询语言(SQL)
- 数据和关系都存储在单独的表中
- 数据操纵语言,数据定义语言
- 严格的一致性
- 基础事务
NoSQL
- 代表着不仅仅是SQL
- 没有声明性查询语言
- 没有预定义的模式
- 键值对存储,列存储,文档存储,图形数据库
- 最终一致性,而非ACID属性
- 非结构化和不可预知的数据
- CAP定理
- 高性能,高可用性 和 可伸缩性

NoSQL的四大分类

  • kv键值型数据库:redis,memcache
  • 文档型数据库:MongoDB,CouchDB
  • 列存储数据库:Hbase,Cassandra,分布式文件系统
  • 图关系型数据库:Neo4J,InfoGrid

四者对比:

CAP+BASE

  • 传统的关系型数据库遵循ACID原则。A:原子性 C:一致性 I:隔离性 D:持久性
  • 分布式存储系统中遵循CAP理论。但只能是实现CAP三个理论中的两个
CAP理论:
 C:consisiency 强一致性
 A:availability 可用性
 P:partition tolerance 分区容错性
其中由于当前网络硬件下肯定会出现延迟丢包,所以分区容错性必须实现。
所以只能在一致性和可用性之间权衡,没有NOSQL可以同时保证这三点。

  • CAP理论的核心是一个分布式系统不可能同时很好的满足一致性,可用性和分区容错性这三个需求, 最多只能同时较好的满足两个。因此,根据 CAP 原理将 NoSQL 数据库分成了满足 CA 原则、满足 CP 原则和满足 AP 原则三 大类:
CA - 单点集群,满足一致性,可用性的系统,通常在可扩展性上不太强大。
CP - 满足一致性,分区容忍必的系统,通常性能不是特别高。
AP - 满足可用性,分区容忍性的系统,通常可能对一致性要求低一些。

  • BASE理论

    BASE理论是对CAP中一致性和可用性权衡的结果。其核心思想是即使无法做到强一致性,但 每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性

 BASE理论:
 
 基本可用(Basically Available): 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性,即保证核心可用。电商大促时,为了应对访问量激增,部分用户可能会被引导到降级页面,服务层也可能只提供降级服务。这就是损失部分可用性的体现。
 
 软状态(Soft State): 软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据至少会有三个副本,允许不同节点间副本同步的延时就是软状态的体现。MySQL Replication 的异步复制也是一种体现。
 
 最终一致性(Eventual Consistency): 最终一致性是指系统中的所有数据副本经过一定时间后,最
 终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。
 

  • 分布式:

分布式:不同的多台服务器上面部署不同的服务模块(工程),他们之间通过Rpc通信和调用,对外 提供服务和组内协作。

  • 集群:

不同的多台服务器上面部署相同的服务模块,通过分布式调度软件进行统一的调度,对外提供 服务和访问。

22. 总结一下本文参考的大佬的优秀文章:

  • 狂神的redis:  https://www.bilibili.com/video/BV1S54y1R7SB
  • 彻底理解 Redis 的持久化机制:RDB和AOF https://mp.weixin.qq.com/s/ij67XYJgYK2kYrAlSDz3sw
  • 宕机了,缓存数据没了(AOF)   https://mp.weixin.qq.com/s/AFd2rOKD7RViadUjBGiobw
  • 再也不怕,缓存雪崩、击穿、穿透! https://mp.weixin.qq.com/s/_StOUX9Nu-Bo8UpX7ThZmg
  • 你管这破玩意叫 RDB  https://mp.weixin.qq.com/s/GXnNSLqiAN0HDQMYE0yhug
  • 破玩意 | Redis 为什么那么快  https://mp.weixin.qq.com/s/keVI4Fn8N42VhIhODkuqBA
  • Redis 发布订阅,小功能大用处,真没那么废材 !  https://www.cnblogs.com/goodAndyxublog/p/13716071.html
  • 你管这破玩意叫哨兵?  https://mp.weixin.qq.com/s/6qhK1oHXP_VzfgR9BjYVJg


文章转载自coderchen,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论