暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

cassandra 基础学习1

原创 不忘初心方得始终 2021-11-15
784

Apache Cassandra(社区内一般简称为C*)是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于改善电子邮件系统的搜寻效能的简单格式数据,集Google BigTable的数据模型与Amazon Dynamo的完全分布式架构于一身。Facebook于2008将 Cassandra 开源,此后,由于Cassandra良好的可扩展性和性能,被 Apple[1], Comcast[2],Instagram[3], Spotify[4], eBay[5], Rackspace[6], Netflix[7]等知名网站所采用,成为了一种流行的分布式结构化数据存储方案。

1 简介

1.1 结构模型

2007 年 Facebook 为了解决消息收件箱搜索问题( Inbox Search problem)而开始设计 Cassandra 项目。 当时 Facebook 遇到了传统的方法难以解决的超大数据量存储可扩展性问题。具体来说,项目团队需要处理大量的消息副本、消息的反向索引等不同形式的数据,需要处理很多随机读和并发随机写操作。

1.1.1 数据模型

Cassandra使用了Google 设计的 BigTable的数据模型,与面向行(row)的传统的关系型数据库键值存储的key-value数据库不同,Cassandra使用的是宽列存储模型(Wide Column Stores)[8],每行数据由row key唯一标识之后,可以有最多20亿个列[11],每个列有一个column key标识,每个column key下对应若干value。这种模型可以理解为是一个二维的key-value存储,即整个数据模型被定义成一个类似map>的类型。

旧版的Cassandra与客户端交互的方法是通过thrift,而目前新版本的Cassandra采用与SQL语言类似的CQL语言[12]来实现数据模型的定义和数据的读写。其中BigTable中的列族(Column Family)在Cassandra中被称作类似关系型数据库中的称呼——表(table),而Cassandra/BigTable中的row key和column key并称为主键(primary key)。[13]

Cassandra的row key决定了该行数据存储在哪些节点中,因此row key需要按哈希来存储,不能顺序的扫描或读取,而一个row内的column key是顺序存储的,可以进行有序的扫描或范围查找[13]

1.1.2 存储模型

与BigTable和其模仿者HBase不同,Cassandra的数据并不存储在分布式文件系统如GFSHDFS中,而是直接存于本地。与BigTable一样,Cassandra也是日志型数据库,即把新写入的数据存储在内存的Memtable中并通过磁盘中的CommitLog来做持久化,内存填满后将数据按照key的顺序写进一个只读文件SSTable中,每次读取数据时将所有SSTable和内存中的数据进行查找和合并[14][15]。这种系统的特点是写入比读取更快[16],因为写入一条数据是顺序计入commit log中,不需要随机读取磁盘以及搜索。

1.1.3 分布式架构

Cassandra的系统架构与Dynamo类似,是基于一致性哈希的完全P2P架构,每行数据通过哈希来决定应该存在哪个或哪些节点中[17]。集群没有master的概念,所有节点都是同样的角色,彻底避免了整个系统的单点问题导致的不稳定性,集群间的状态同步通过Gossip协议来进行P2P的通信。每个节点都把数据存储在本地,每个节点都接受来自客户端的请求。每次客户端随机选择集群中的一个节点来请求数据,对应接受请求的节点将对应的key在一致性哈希的环上定位是哪些节点应该存储这个数据,将请求转发到对应的节点上,并将对应若干节点的查询反馈返回给客户端。

在一致性、可用性和分区耐受能力(CAP)的折衷问题上,Cassandra和Dynamo一样比较灵活。Cassandra的每个keyspace可配置一行数据会写入多少个节点(设这个数为N),来保证数据不因为机器宕机或磁盘损坏而丢失数据,即保证了CAP中的P。用户在读写数据时可以指定要求成功写到多少个节点才算写入成功(设为W),以及成功从多少个节点读取到了数据才算成功(设为R)。可推理得出,当W+R>N时,读到的数据一定是上一次写入的,即维护了强一致性,确保了CAP中的C。当W+R<=N时,数据是最终一致性因为存在一段时间可能读到的并不是最新版的数据。当W=N或R=N时,意味着系统只要有一个节点无响应或宕机,就有一部分数据无法成功写或者读,即失去了CAP中的可用性A。因此,大多数系统中,都将N设为3,W和R设为QUORUM,即“过半数”——在N为3时QUORUM是2。

1.1.4 支持的操作

Cassandra支持对一列数据进行insert、update、或delete操作。其中insert和update虽然语法略有区别,但语义上等价,即可以针对已经存在的行进行update或insert一个不存在的行。

1.1.5 轻量级事务

从2.0版开始,Cassandra支持轻量级事务。这种事务被称为“compare-and-set”,简称CAS。通过paxos算法实现在满足某条件后才修改数据否则不修改。目前支持”insert if not exist”、”update if col=value”、”delete if exist”等几种操作。

1.2 特性

1.2.1 分布式和去中心化(Distributed and Decentralized)

Cassandra 是分布式的,这意味着它可以运行在多台机器上,并呈现给用户一个一致的整体。事实上,在一个节点上运行 Cassandra 是没啥用的,虽然我们可以这么做,并且这可以帮助我们了解它的工作机制,但是你很快就会意识到,需要多个节点才能真正了解 Cassandra 的强大之处。它的很多设计和实现让系统不仅可以在多个节点上运行,更为多机架部署进行了优化,甚至一个 Cassandra 集群可以运行在分散于世界各地的数据中心上。你可以放心地将数据写到集群的任意一台机器上,Cassandra 都会收到数据。

对于很多存储系统(比如 MySQL, Bigtable),一旦你开始扩展它,就需要把某些节点设为主节点,其他则作为从节点。但 Cassandra 是无中心的,也就是说每个节点都是一样的。与主从结构相反,Cassandra 的协议是 P2P 的,并使用 gossip 来维护存活或死亡节点的列表。关于 gossip 可以参见《分布式原理:一文了解 Gossip 协议》

去中心化这一事实意味着 Cassandra 不会存在单点失效。Cassandra 集群中的所有节点的功能都完全一样, 所以不存在一个特殊的主机作为主节点来承担协调任务。有时这被叫做服务器对称(server symmetry)。

综上所述,Cassandra 是分布式、无中心的,它不会有单点失效,所以支持高可用性。

1.2.2 弹性可扩展(Elastic Scalability)

可扩展性是指系统架构可以让系统提供更多的服务而不降低使用性能的特性。仅仅通过给现有的机器增加硬件的容量、内存进行垂直扩展,是最简单的达到可扩展性的手段。而水平扩展则需要增加更多机器,每台机器提供全部或部分数据,这样所有主机都不必负担全部业务请求。但软件自己需要有内部机制来保证集群中节点间的数据同步。

弹性可扩展是指水平扩展的特性,意即你的集群可以不间断的情况下,方便扩展或缩减服务的规模。这样,你就不需要重新启动进程,不必修改应用的查询,也无需自己手工重新均衡数据分布。在 Cassandra 里,你只要加入新的计算机,Cassandra 就会自动地发现它并让它开始工作。

1.2.3 高可用和容错(High Availability and Fault Tolerance)

从一般架构的角度来看,系统的可用性是由满足请求的能力来量度的。但计算机可能会有各种各样的故障,从硬件器件故障到网络中断都有可能。如何计算机都可能发生这些情况,所以它们一般都有硬件冗余,并在发生故障事件的情况下会自动响应并进行热切换。对一个需要高可用的系统,它必须由多台联网的计算机构成,并且运行于其上的软件也必须能够在集群条件下工作,有设备能够识别节点故障,并将发生故障的中端的功能在剩余系统上进行恢复。

Cassandra 就是高可用的。你可以在不中断系统的情况下替换故障节点,还可以把数据分布到多个数据中心里,从而提供更好的本地访问性能,并且在某一数据中心发生火灾、洪水等不可抗灾难的时候防止系统彻底瘫痪。

1.2.4 可调节的一致性(Tuneable Consistency)

2000年,加州大学伯克利分校的 Eric Brewer 在 ACM 分布式计算原理会议提出了著名的 CAP 定律。CAP 定律表明,对于任意给定的系统,只能在一致性(Consistency)、可用性(Availability)以及分区容错性(Partition Tolerance)之间选择两个。关于 CAP 定律的详细介绍可参见《分布式系统一致性问题、CAP定律以及 BASE 理论》以及《一篇文章搞清楚什么是分布式系统 CAP 定理》。所以 Cassandra 在设计的时候也不得不考虑这些问题,因为分区容错性这个是每个分布式系统必须考虑的,所以只能在一致性和可用性之间做选择,而 Cassandra 的应用场景更多的是为了满足可用性,所以我们只能牺牲一致性了。但是根据 BASE 理论,我们其实可以通过牺牲强一致性获得可用性。

Cassandra 提供了可调节的一致性,允许我们选定需要的一致性水平与可用性水平,在二者间找到平衡点。因为客户端可以控制在更新到达多少个副本之前,必须阻塞系统。这是通过设置副本因子(replication factor)来调节与之相对的一致性级别。

通过副本因子(replication factor),你可以决定准备牺牲多少性能来换取一致性。 副本因子是你要求更新在集群中传播到的节点数(注意,更新包括所有增加、删除和更新操作)。

客户端每次操作还必须设置一个一致性级别(consistency level)参数,这个参数决定了多少个副本写入成功才可以认定写操作是成功的,或者读取过程中读到多少个副本正确就可以认定是读成功的。这里 Cassandra 把决定一致性程度的权利留给了客户自己。

所以,如果需要的话,你可以设定一致性级别和副本因子相等,从而达到一个较高的一致性水平,不过这样就必须付出同步阻塞操作的代价,只有所有节点都被更新完成才能成功返回一次更新。而实际上,Cassandra 一般都不会这么来用,原因显而易见(这样就丧失了可用性目标,影响性能,而且这不是你选择 Cassandra 的初衷)。而如果一个客户端设置一致性级别低于副本因子的话,即使有节点宕机了,仍然可以写成功。

总体来说,Cassandra 更倾向于 CP,虽然它也可以通过调节一致性水平达到 AP;但是不推荐你这么设置。

1.2.5 面向行(Row-Oriented)

Cassandra 经常被看做是一种面向列(Column-Oriented)的数据库,这也并不算错。它的数据结构不是关系型的,而是一个多维稀疏哈希表。稀疏(Sparse)意味着任何一行都可能会有一列或者几列,但每行都不一定(像关系模型那样)和其他行有一样的列。每行都有一个唯一的键值,用于进行数据访问。所以,更确切地说,应该把 Cassandra 看做是一个有索引的、面向行的存储系统。

Cassandra 的数据存储结构基本可以看做是一个多维哈希表。这意味着你不必事先精确地决定你的具体数据结构或是你的记录应该包含哪些具体字段。这特别适合处于草创阶段,还在不断增加或修改服务特性的应用。而且也特别适合应用在敏捷开发项目中,不必进行长达数月的预先分析。对于使用 Cassandra 的应用,如果业务发生变化了,只需要在运行中增加或删除某些字段就行了,不会造成服务中断。

当然, 这不是说你不需要考虑数据。相反,Cassandra 需要你换个角度看数据。在 RDBMS 里, 你得首先设计一个完整的数据模型, 然后考虑查询方式, 而在 Cassandra 里,你可以首先思考如何查询数据,然后提供这些数据就可以了。

1.2.6 灵活的模式(Flexible Schema)

Cassandra 的早期版本支持无模式(schema-free)数据模型,可以动态定义新的列。 无模式数据库(如 Bigtable 和 MongoDB)在访问大量数据时具有高度可扩展性和高性能的优势。 无模式数据库的主要缺点是难以确定数据的含义和格式,这限制了执行复杂查询的能力。

为了解决这些问题,Cassandra 引入了 Cassandra Query Language(CQL),它提供了一种通过类似于结构化查询语言(SQL)的语法来定义模式。 最初,CQL 是作为 Cassandra 的另一个接口,并且基于 Apache Thrift 项目提供无模式的接口。 在这个过渡阶段,术语“模式可选”(Schema-optional)用于描述数据模型,我们可以使用 CQL 的模式来定义。并且可以通过 Thrift API 实现动态扩展以此添加新的列。 在此期间,基础数据存储模型是基于 Bigtable 的。

从 3.0 版本开始,不推荐使用基于 Thrift API 的动态列创建的 API,并且 Cassandra 底层存储已经重新实现了,以更紧密地与 CQL 保持一致。 Cassandra 并没有完全限制动态扩展架构的能力,但它的工作方式却截然不同。 CQL 集合(比如 list、set、尤其是 map)提供了在无结构化的格式里面添加内容的能力,从而能扩展现有的模式。CQL 还提供了改变列的类型的能力,以支持 JSON 格式的文本的存储。

因此,描述 Cassandra 当前状态的最佳方式可能是它支持灵活的模式。

1.2.7 高性能(High Performance)

Cassandra 在设计之初就特别考虑了要充分利用多处理器和多核计算机的性能,并考虑在分布于多个数据中心的大量这类服务器上运行。它可以一致而且无缝地扩展到数百台机器,存储数 TB 的数据。Cassandra 已经显示出了高负载下的良好表现,在一个非常普通的工作站上,Cassandra 也可以提供非常高的写吞吐量。而如果你增加更多的服务器,你还可以继续保持 Cassandra 所有的特性而无需牺牲性能。

1.3 使用场景

我们已经介绍了 Cassandra 的主要特点,对 Cassandra 的长处有了一定的理解。尽管 Cassandra 设计精巧,功能出色,但也不能胜任所有的工作。所以我们来介绍一下 Cassandra 最适合的场景。

1.3.1 大规模部署

你可能不会开着一辆轻型的小卡车去取干洗的衣服,小卡车显然不适合这种工作。Cassandra 的很多精巧设计都专注于高可用、可调一致性、P2P 协议、无缝扩展等,这些都是 Cassandra 的卖点。这些特性在单节点工作时都是没有意义的,更无法实现它的全部能力。

但是,单节点关系数据库在很多情况下可能正是我们需要的。所以你需要做一些评估。考虑你的期望的流量、吞吐需求以及 SAL 等。关于评估没有什么硬性的指标和要求。但如果你认为有几种关系型数据库可以很好地应付你的流量,提供不错的性能,那可能选关系型数据库更好。简单地说,这是因为 RDBMS 更易于在单机上运行,对你来说也更熟悉。

但是,如果你认为需要至少几个节点才能支撑你的业务,那 Cassandra 就是个不错的选择。如果你的应用可能需要数十个节点,那 Cassandra 可能就是个很棒的选择了。

1.3.2 写密集、统计和分析型工作

考虑一下你的应用的读写比例,Cassandra 是为优异的写吞吐量而特别优化的。

许多早期使用 Cassandra 的产品都用于存储用户状态更新、社交网络、建议/评价以及应用统计等。这些都是 Cassandra 很好的应用场景,因为这些应用大都是写多于读的,并且更新可能随时发生并伴有突发的峰值。事实上,支撑应用负载需要很高的多客户线程并发写性能,这正是 Cassandra 的主要特性。

根据项目的 wiki,Cassandra 已经被用于开发了多种不同的应用,包括窗口化的时间序列数据库,用于文档搜索的反向索引,以及分布式任务优先级队列。

1.3.3 地区分布

Cassandra 直接支持多地分布的数据存储,Cassandra 可以很容易配置成将数据分布到多个数据中心的存储方式。如果你有一个全球部署的应用,那么让数据贴近用户会获得不错的性能收益,Cassandra 正适合这种应用场合。

1.3.4 变化的应用

如果你正在“初创阶段”,业务会不断改进,Cassandra 这种灵活的模式的数据模型可能更适合你。这让你的数据库能更快地跟上业务改进的步伐。

2、使用

2.1 安装

前提

  1. Java >= 1.8 (OpenJDK and Oracle JVMS have been tested)
  2. Python 2.7 (for cqlsh)

开始

1
2
3
# 下载对应的版本ex:https://apache.website-solution.net/cassandra/3.11.8/apache-cassandra-3.11.8-bin.tar.gz
$ tar -zxvf apache-cassandra-$VERSION.tar.gz
$ cd apache-cassandra-$VERSION

使用-f参数运行启动脚本,Cassandra前台运行并登录到标准输出。可以使用ctrl-C停止它

1
$ bin/cassandra -f

可以去设置环境变量,这里先不设置了。

Now let’s try to read and write some data using the Cassandra Query Language:

1
$ bin/cqlsh

测试

The command line client is interactive so if everything worked you should be sitting in front of a prompt:

1
2
3
4
Connected to Test Cluster at localhost:9160.
[cqlsh 2.2.0 | Cassandra 1.2.0 | CQL spec 3.0.0 | Thrift protocol 19.35.0]
Use HELP for help.
cqlsh>

As the banner says, you can use ‘help;’ or ‘?’ to see what CQL has to offer, and ‘quit;’ or ‘exit;’ when you’ve had enough fun. But lets try something slightly more interesting:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cqlsh> CREATE KEYSPACE schema1
WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
cqlsh> USE schema1;
cqlsh:Schema1> CREATE TABLE users (
user_id varchar PRIMARY KEY,
first varchar,
last varchar,
age int
);
cqlsh:Schema1> INSERT INTO users (user_id, first, last, age)
VALUES ('jsmith', 'John', 'Smith', 42);
cqlsh:Schema1> SELECT * FROM users;
user_id | age | first | last
---------+-----+-------+-------
jsmith | 42 | john | smith
cqlsh:Schema1>

2.2 配置

2.2.1 设置cassandra数据目录

  • data_file_directories:为数据文件目录
  • commitlog_directory:为日志文件目录
  • saved_caches_directory:为缓存文件目录
1
2
3
4
5
mkdir -p /data/cassandra/data /data/cassandra/saved_caches /data/cassandra/commitlog
# 把目录归属改成操作用户
sudo chown -R cassandra:cassandra /data/cassandra/


1
2
3
4
vim conf/cassandra.yaml
commitlog_directory: /data/cassandra/commitlog
saved_caches_directory: /data/cassandra/saved_caches
data_file_directories: /data/cassandra/data

2.2.2 设置账号密码

1
https://blog.xiaoxiaomo.com/2017/11/21/Cassandra-%E7%94%A8%E6%88%B7%E5%AF%86%E7%A0%81%E8%AE%BE%E7%BD%AE/

2.2.3 设置远程访问

1
2
3
4
5
6
7
8
9
10
11
# 这两个不知道要不要加
broadcast_address: [node-ip]

start_rpc: true
rpc_address: 0.0.0.0
broadcast_rpc_address: [node-ip]
listen_address: [node-ip]
# 种子服务器IP用英文逗号分隔,为内网地址,不要将整个集群的节点都设置成种子节点。选出两个最稳定的节点就行
seed_provider:
- class_name: ...
- seeds:"[node-ip]"
  • data_file_directories:数据文件所在的一个或多个目录。
  • commitlog_directory:提交日志文件所在的目录。
  • saved_caches_directory:保存的缓存所在的目录。
  • hints_directory:提示所在的目录。

出于性能原因,如果您有多个磁盘,请考虑将提交日志和数据文件放在不同的磁盘上。

2.2.4 配置参考

cluster_name # 修改为自己喜欢的名字,后面运行后再修改比较麻烦

集群维度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
cluster_name: //集群的名字,默认是Test Cluster,用''括起来。不同的cluster name的节点无法组成一个集群.# 修改为自己喜欢的名字,后面运行后再修改比较麻烦

num_tokens: 256 //集群中单节点的的分配token数,因为使用vnode,也就是vnode的个数,每个token是随机生成的,此外如果不使用vnode的话,可以使用每个节点预分配一个初始的token,通过下面的配置下;

initial_token: //如果集群不想使用vnode的话,需要手工给每个节点进行token配置,手工计算节点的token数,但是扩容的时候建议是成倍扩容。vnode不需要

partitioner: //集群的数据分配算法,也就是常见的一致性hash算法中计算hash的那个模块,其中默认使用org.apache.cassandra.dht.Murmur3Partitioner,也就是现在比较推荐的mum3的hash策略,也有别的RandomPartitioner(md5),OrderPreservingPartitioner,ByteOrderedPartitioner(字典序),对scan比较亲和,但是会有key倾斜

seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
# seeds is actually a comma-delimited list of addresses.
# Ex: ",,"
- seeds: "127.0.0.1"
//这个配置相对比较重要,主要是集群种seed节点配置,需要所有节点一样,而且数量有一定限制,取决于集群规模,如果是10个节点,推荐2个是ok的。

endpoint_snitch: //集群的snitch策略,涉及到集群的副本节点管理的测,默认SimpleSnitch
单节点维度

涉及到单节点的相关配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
hints_directory: //涉及到的hint的目录,默认的使用 /var/lib/cassandra/hints,如果某个节点挂掉,且这个节点负责挂掉节点hint记录,那么数据就会记到这个目录下面

authenticator: //认证相关,默认是AllowAllAuthenticator,所有都可以通过认证,还有使用账户密码认证,这个需要集群种所有节点使用一种相同配置

authorizer: //鉴权,默认是所有都可以有任何操作,可以通过配置CassandraAuthorizer进行不同鉴权,同样各个节点配置需要一样;

data_file_directories: //数据文件的放置目录,默认是使用/var/lib/cassandra/data,推荐多快盘组合使用。

commitlog_directory: //commitlog的配置目录,默认是 /var/lib/cassandra/commitlog,推荐较好的磁盘配置

cdc_enabled: //是否使用cdc 功能,默认是关闭,如果开启,需要表配置也进行开启

disk_failure_policy: //单机数据盘磁盘坏盘处理配置,默认stop,不接受gossip响应以及停掉cilent的服务。但是可以通过jmx访问

commit_failure_policy: //commitlog数据盘的坏盘策略,默认是stop

commitlog_sync: //commitlog的 数据sync策略,默认是periodic,这个会影响系统的写性能;
commitlog_sync_period_in_ms: // period模式下的flush 频率10000ms;也可以使用另一种sync策略,与period是二选一的;
commitlog_sync: //如果是batch 下面就是batch sync的时间配置。
commitlog_sync_batch_window_in_ms: 2

commitlog_segment_size_in_mb: //commitlog每隔多大切一下,默认是32m
commitlog_compression: //支持commitlog的压缩,默认是lz4

concurrent_reads: //节点读线程池线程数,默认32,io bound,建议是16*磁盘数
concurrent_writes: //节点写线程池线程数,默认32,cpu bound,建议是8*cpu core数
concurrent_counter_writes: //counter线程池数,默认32,与read一样

memtable_allocation_type: //memtable 的内存管理,默认是heap_buffers,也就是on heap的buffer管理

listen_address: //节点监听服务的地址,本地节点的bind地址接口,如果配置文件不容易管理,可以使用统一的网卡bind配置:listen_interface: eth0
rpc_address: //4.0之前需要thrift,所有这个可以配置下,默认localhost
更多配置

https://www.cnblogs.com/ilifeilong/p/9405907.html

2.2.5 启动、关闭、数据清理

启动

1
2
$ cassandra -f # 前台启动,在终端上打印日志,建议使用
$ cassandra # 后台启动,不在终端打印日志

停止 Cassandra:

1
2
$ ps -ef | grep cassandra
$ sudo kill pid

清理数据:

1
$ sudo rm -rf /var/lib/apache-cassandra-2.1.9/data/*

进入cqlsh

1
$ bin/cqlsh

3、cqlsh

CQL是Cassandra Query Language的缩写,目前作为Cassandra默认并且主要的交互接口。CQL和SQL语法很相似,主要的区别是cql不支持join和子查询,相对来说没有sql那么强大。

3.1 登录

1
2
cqlsh 10.0.102.116 -u 'backend' -p 'backend' --file="/data/cql/users.cql"  #登录并执行某个cql文件
bin/cql

3.2 语句

3.2.1 other

1
2
3
4
5
6
7
$ cqlsh> help #帮助命令
$ cqlsh> capture '/data/cql/result/output' # 捕获命令,所有的select查询的结果都将保存在output文件中.
$ cqlsh:test> capture off; # 关闭捕获
$ cqlsh:test> copy users(id, username, age) to 'D:\myfile' # 复制命令copy to, 将表中的数据写入到文件中
$ cqlsh> expand on; # 扩展命令,使用命令后select输出的结果展示形式不一样;
$ cqlsh> quit # 推出cqlsh
$ cqlsh> show host; #show命令:显示当前cqlsh会话的详细信息

3.2.2 keyspace

键空间:是列族(表)、索引等容器,类似于mysql中的数据库database,类似于oracle中的表空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 # 显示所有的keyspace
$ cqlsh> describe keyspaces;

# 创建键空间
$ cqlsh> CREATE KEYSPACE users WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1' : 1, 'DC2' : 3} AND durable_writes = true;

# 再创建一个
$ cqlsh> CREATE KEYSPACE schema1
WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

# 修改键空间的replication 和durable_writes
$ cqlsh> cqlsh> ALTER KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor' : 1} AND durable_writes = true;

# 使用键空间
$ cqlsh> use schema1;

# 删除键空间
$ cqlsh> drop keyspace test2;

(1)、创建键空间

1
2
3
CREATE KEYSPACE  
WITH REPLICATION = {'class':'Strategy Name','replication_factor': int}
AND durable_writes = boolean;

策略class

  • SimpleStrategy:简单策略,在一个数据中心的情况下使用
    • SimpleStrategy:简单策略,在一个数据中心的情况下使用
    • NetworkTopologyStrategy:网络拓扑策略,用于多个数据中心
  • 复制因子replication_factor:副本数
  • 持久写入属性:durable_write:boolean值,默认true

(2)、 修改键空间的replication 和durable_writes

1
2
3
ALTER KEYSPACE 
WITH REPLICATION = {'class': 'strategy name', 'replication_factor': int}
AND durable_writes = boolean;

3.2.3 table | columnfamily

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 创建表
$ cqlsh> CREATE TABLE users (
user_id varchar PRIMARY KEY,
first varchar,
last varchar,
age int
)with comment = 'user info table';

CREATE TABLE user (
id varchar,
first varchar,
last varchar,
age int,
PRIMARY KEY(id,age)
)with comment = 'user info table';

# 插入
$ cqlsh> INSERT INTO users (user_id, first, last, age)
VALUES ('jsmith', 'John', 'Smith', 42);

# 查询
$ cqlsh> SELECT * FROM users;

# 查看所有表
$ cqlsh> describe tables;

# 查看表信息
$ cqlsh> describe table users;

# 修改表,添加一列
$ cqlsh> alter table users add temp varchar

# 修改表,删除一列
$ cqlsh> alter table users drop temp;

(1)、创建表

1
2
3
4
5
6
CREATE TABLE [IF NOT EXISTS]  (
column cql_type,
column cql_type,
column cql_type,
RIMARY KEY(column, column)
) [WITH property = value AND property = value ];
1
2
3
4
5
6
7
8
9
10
11
12
13
 ::= CREATE ( TABLE | COLUMNFAMILY ) ( IF NOT EXISTS )? 
'(' ( ',' )* ')'
( WITH

::= ( STATIC )? ( PRIMARY KEY )?
| PRIMARY KEY '(' ( ',' )* ')'

::=
| '(' (',' )* ')'


| COMPACT STORAGE
| CLUSTERING ORDER

CREATE TABLE语句创建一个新的table。每个table都是rows的集合(常常表示相关实体),定义了一些属性。注意,CREATE COLUMNFAMILY语法是支持的,作为CREATE TABLE的别名(由于历史原因)。

试图创建一个已经存在的table将返回一个错误,除非使用IF NOT EXISTS选项。如果使用它,语句将是空操作如果table已经存在。

在table内,row由PRIMARY KEY唯一标识。所以所有的table必须定义一个PRIMARY KEY。一个PRIMARY KEY可以由一个或多个columns组成。如果PRIMARY KEY只有一个column,则可以直接在此column之后定义。否则PRIMARY KEY必须逗号分割,以括号包含多个columns来定义。注意:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE t (
k int PRIMARY KEY,
other text
)
// 等价于
CREATE TABLE t (
k int,
other text,
PRIMARY KEY (k)
)

介绍下cql_type

CQL支持一组丰富的数据类型(cql_type),包括

  • 原生类型(native_type)、
  • 集合类型(collection_type)、
  • 用户定义类型(user_defined_type)、
  • 元组类型(tuple_type)
  • 自定义类型(custom_type)

1)、原生类型native_type

所谓原生类型就是关系型数据库支持的常用的数据类型和Cassandra扩展的一些基本数据类型。

  • tinyint(8位有符号整数)、smallint(16位有符号整数)、int(32位有符号整数)、bigint(64-bit有符号long)、varint(任意精度整数)
  • float、double、decimal(可变精度小数)
  • ascii(ASCII字符串)、varchar、text(UTF-8编码字符串)
  • date(yyyy-mm-dd)、time(hh:mm:ss)、timestamp
  • boolean
  • blob(任意十六进制字节)
  • inet(ipv4或ipv6格式的IP地址)
  • uuid(标准uuid,使用uuid()函数生成uuid,blobAsUuid(timeuuidAsBlob(now())))
  • timeuuid(时间相关的uuid,可以使用now()作为值)
  • duration(具有纳秒精度的持续时间)
  • counter(为64位分布式计数器值)
  • ma (JSON风格的元素集合)
  • udt(自定义类型)

2)、集合数据类型collection_type

1
2
3
4
5
list    [value, value,...]
set {value, value, ...}
map {'key1':value1, 'key2':value2} 使用column['key']来访问
tuple (value, value, ...)
frozen(元组,集合,用户定义的类型, 存储Cassandra类型)

3)、用户定义类型UDT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 创建类型
CREATE TYPE (
column cql_type,
column cql_type
);

cqlsh:test>create type address (
proivnce text,
city text,
region text,
town text
);

// 列举所有的类型
cqlsh:test> describe types;

// 查看某个类型
cqlsh:test> describe type address;

// 添加字段
ALTER TYPE ADD column cql_type;
cqlsh:test> alter type address add way text;

// 重命名字段
ALTER TYPE RENAME TO

cqlsh:test> alter type address rename way to road;

// 删除类型
DROP TYPE ;
cqlsh:test> drop type address;

(2)、修改表

  • 添加一列
    ALTER TABLE ADD column cql_type
1
cqlsh:test> alter table users add temp varchar
  • 删除一列

    ALTER TABLE DROP column

1
cqlsh:test> alter table users drop temp;
  • 删除多列

ALTER TABLE DROP (column, column)

1
cqlsh:test> alter table users drop (temp1,temp2);

(3)、删除表

drop table

(4)、截断表: 删除表中的所有行

truncate

(5)、所有表

describe tables

3.2.4 索引

  • 创建索引
    CREATE INDEX [name] ON

    cqlsh:test> create index users_username_idx on users(username);

  • 删除索引
    DROP INDEX [IF EXISTS] ;

3.2.4 表的简单类型CURD

  • insert
1
2
3
4
5
6
INSERT INTO (column, column)
VALUES(value, value)
[USING TTL seconds];

INSERT INTO
JSON ' {"key1": "value", "key2": "value"} '

For instance:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
cqlsh:test>insert into users(
id,
username,
age,
height,
birthday,
isvip,
salt,
ip,
hobbies,
skills,
scores,
tags,
createtime)
values(
1,
'mengday',
26,
135.5,
'1990-10-26',
true,
uuid(),
'192.168.1.1',
['java', 'iOS'],
{'eat', 'drink'},
{'china': 80, 'english': 90},
('mm', 'money'),
dateof(now())
);

cqlsh:test> insert into users(
id,
username,
age,
height,
birthday,
isvip,
salt,
ip,
hobbies,
skills,
scores,
tags,
createtime)
values(
2,
'mengdee',
36,
145.5,
'1989-06-06',
false,
blobAsUuid(timeuuidAsBlob(now())),
'192.168.11.11',
['java', 'php'],
{'play', 'happy'},
{'china': 90, 'english': 99},
('gg', 'rmb'),
dateof(now())
);

cqlsh:test>insert into users
json ' {"id": 3, "username": "mengdie", "age": 16}'
using ttl 3600;
  • select
    SELECT column, column FROM WHERE ;

For instance:

1
2
3
cqlsh:test> select * from users;
// 如果where条件中使用的字段没有创建索引,需要使用allow filtering表示强制查询
cqlsh:test cqlsh:test> select id, username, createtime, tags from users where id in(1, 2) and age > 18 and tags = ('mm', 'money') allow filtering;
  • update
1
2
3
4
5
UPDATE  [USING TTL seconds]
SET
column = value,
column = value
WHERE

For instance:

1
2
3
4
cqlsh:test> update users using ttl 60 set username = 'hehe' where id = 3;

// 当更新的条件不满足时相当于insert操作
cqlsh:test> update users set username = 'admin' where id = 999999;
  • delete
    • 删除行
1
DELETE FROM  WHERE 
  • 删除字段
1
2
DELETE column FROM  WHERE 
delete from users where user_id='jsmith';
  • batch
1
2
3
4
5
begin batch
;
;
;
apply batch;
1
2
3
4
5
cqlsh:test>begin batch
insert into users json ' {"id": 4, "username": "test", "age": 16}';
update users set age = 20 where id = 4;
delete age from users where id = 4;
apply batch;

3.2.5 集合操作CRUD

  • set 无序集合
1
2
3
4
5
6
7
8
创建表声明时使用set,
使用时使用一对花括号将多个值括起{value, value}

// 添加元素
cqlsh:test> update users set skills = skills + {'eat', 'drink', 'mm'} where id = 2;

// 删除元素
cqlsh:test> update users set skills = skills - {'eat', 'mm'} where id = 2;
  • list 有序集合, 允许重复,通过索引访问某个元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
声明时需要指定元素的数据类型list
使用时使用中括号将集合元素括起来, [value, value]

// 添加元素
cqlsh:test>update users set hobbies = hobbies + ['php', 'javascript'] where id = 1;

cqlsh:test>update users set hobbies = ['go'] + hobbies where id = 1;

// 删除元素
cqlsh:test> update users set hobbies = hobbies - ['php', 'javascript'] where id = 1;

// 修改指定位置的元素
cqlsh:test> update users set hobbies[0] = 'golang' where id = 1;

// 删除指定位置的元素
cqlsh:test> delete hobbies[0] from users where id = 1;
  • map
1
2
3
4
5
6
7
8
9
10
11
声明时使用
使用时是 {'key':value, 'key':value}

// 添加元素
cqlsh:test> update users set scores = scores + {'math': 80, 'physics': 88} where id = 1;
// 删除元素
cqlsh:test> update users set scores = scores - {'math', 'physics'} where id = 1;

cqlsh:test> delete scores['english'] from users where id = 1;
// 修改元素
cqlsh:test> update users set scores['china'] = 100 where id = 1;
  • tuple
1
2
3
4
声明时使用tuple
使用时使用(value, ...., value)

cqlsh:test> update users set tags = ('girl', '$') where id = 1;
  • 集合数据过滤contains
1
2
3
4
5
// 使用contains 对list、set集合中的元素进行过滤
cqlsh:test> select * from users where hobbies contains 'php' allow filtering;

// 使用contains key 对map集合进行过滤
cqlsh:test> select * from users where scores contains key 'english' allow filtering;

3.2.6 物化视图(Materialized View)

  • 创建视图

    1
    2
    3
    4
    CREATE MATERIALIZED VIEW [IF NOT EXISTS]  AS
    select_statement
    PRIMARY KEY (column, column)
    [with table_options];

    For instance:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    cqlsh:test> create materialized view user_view as 
    select id, username, salt, isvip from users where username is not null
    primary key (id, username)
    with comment = 'users view';

    cqlsh:test> select * from user_view;

    id | username | isvip | salt
    ----+----------+-------+--------------------------------------
    2 | mengdee | False | 2718b240-8fd7-11e7-b82c-9340daca092f
    4 | test | null | null
    1 | mengday | True | c80b339f-4d2a-4928-9974-603edc65785c
  • 修改视图选项
    ALTER MATERIALIZED VIEW WITH table_options

  • 删除视图
    DROP MATERIALIZED VIEW [IF EXISTS]

3.2.7 函数

  • 预定义函数

    • count():求行数count(*)
    • now(): 当前时间
    • uuid(): 生成一个uuid值
    • min():求最小值
    • max():求最大值
    • sum():求和
    • avg():求平均数
    • cast(column as cql_type): 转换成其他基本数据类型
    • minTimeuuid(): minTimeuuid(‘2013-02-02 10:00+0000’)
    • maxTimeuuid(): maxTimeuuid(‘2013-01-01 00:05+0000’)

    timeuuid、date、timestamp、bigInt 之间的相互转换函数
    Function name、Input type 、Description

    • toDate timeuuid Converts the timeuuid argument into a date type
    • toDate timestamp Converts the timestamp argument into a date type
    • toTimestamp timeuuid Converts the timeuuid argument into a timestamp type
    • toTimestamp date Converts the date argument into a timestamp type
    • toUnixTimestamp timeuuid Converts the timeuuid argument into a bigInt raw value
    • toUnixTimestamp timestamp Converts the timestamp argument into a bigInt raw value
    • toUnixTimestamp date Converts the date argument into a bigInt raw value
    • dateOf timeuuid Similar to toTimestamp(timeuuid) (DEPRECATED)
    • unixTimestampOf timeuuid Similar totoUnixTimestamp(timeuuid) (DEPRECATED)
1
2
3
cqlsh:test> select cast(height as int) from users;
cqlsh:test> select count(*) as count, min(height) as min, max(height) as max, sum(height) as sum, avg(height) as avg, now() as now, uuid() as uuid fro
m users;
  • 自定义函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
create [or replace] function [if not exists] (arg1 int, arg2 text, ...)
returns null on null input
returns
language java
as $$
// some java code
return arg;
$$;

create function if not exists .(argname cql_type)
called on null input
returns
language java
as $$
// some java code
$$;

3.2.8 简单查看

下面给出了Cqlsh记录的shell命令。这些是用于执行任务的命令,如显示帮助主题,退出cqlsh,描述等。

  • HELP -显示所有cqlsh命令的帮助主题。
  • CAPTURE -捕获命令的输出并将其添加到文件。
  • CONSISTENCY -显示当前一致性级别,或设置新的一致性级别。
  • COPY -将数据复制到Cassandra并从Cassandra复制数据。
  • DESCRIBE -描述Cassandra及其对象的当前集群。
  • EXPAND -纵向扩展查询的输出。
  • EXIT -使用此命令,可以终止cqlsh。
  • PAGING -启用或禁用查询分页。
  • SHOW -显示当前cqlsh会话的详细信息,如Cassandra版本,主机或数据类型假设。
  • SOURCE -执行包含CQL语句的文件。
  • TRACING -启用或禁用请求跟踪。

CQL数据定义命令

  • CREATE KEYSPACE -在Cassandra中创建KeySpace。
  • USE -连接到已创建的KeySpace。
  • ALTER KEYSPACE -更改KeySpace的属性。
  • DROP KEYSPACE -删除KeySpace。
  • CREATE TABLE -在KeySpace中创建表。
  • ALTER TABLE -修改表的列属性。
  • DROP TABLE -删除表。
  • TRUNCATE -从表中删除所有数据。
  • CREATE INDEX -在表的单个列上定义新索引。
  • DROP INDEX -删除命名索引。
CQL数据操作指令
  • INSERT -在表中添加行的列。
  • UPDATE -更新行的列。
  • DELETE -从表中删除数据。
  • BATCH -一次执行多个DML语句。

CQL字句

  • SELECT -此子句从表中读取数据
  • WHERE -where子句与select一起使用以读取特定数据。
  • ORDERBY -orderby子句与select一起使用,以特定顺序读取特定数据。

3.3 集群配置

3.3.1 相关配置介绍

集群主要配置:

1
2
3
4
5
cluster_name:集群名,同一集群的多个节点,集群名要一致;
seeds: 种子节点,集群中的全部机器的ip,以逗号隔开选择一些稳定的服务器作为种子节点,不需要配置全部节点
storage_port: Cassandra服务器与服务器之间连接的端口号,一般不需要修改,但要保证此端口上没有防火墙;
listen_address: Cassandra集群中服务器与服务器之间相互通信的地址。如果留空,将默认使用服务器的机器名;
native_transport_port: 默认的CQL本地服务端口,本地的cql客户端与服务器交互的端口;

seed节点(节点的代表):

当节点第一次启动时,它会去查配置文件cassandra.yaml从而得到它属于的集群名称,但是它如何获得集群中其他节点的信息呢?就是通过种子节点(seed node).记住,同一集群中所有的节点的cassandra.yaml中必须有相同的种子节点列表。

选派谁做种子节点没什么特别的意义,仅仅在于新节点加入到集群中时走gossip流程时有用,所以它们没什么特权。

默认Cassandra使用端口

1
2
3
4
7000作为集群通信端口(如果开启了SSL就是7001端口)。
9042端口用于native协议的客户端连接。
7199端口用于JMX,
9160端口用于废弃的Thrift接口

集群主要配置文件目录

1
2
3
data_file_directories: 数据文件存放的目录,一个或多个
commitlog_directory: 提交信息的日志文件存放的目录
saved_caches_directory: 缓存存放的目录

3.3.2 实践

3台机器:10.0.102.116,10.0.102.117,10.0.102.118

以10.0.102.116,10.0.102.117为种子服务器

  1. 如上步骤,在其他节点上创建用户,安装jdk、Cassandra、python
  2. 统一集群的名字
  3. 为每个节点分配一个IP (listen_address和rpc_address要使用IP地址,不要使用hostname)
  4. 确定种子节点,不需要配置全部节点。选一些稳定的机器
  5. 如果是多数据中心,为每个数据中心和机架确定命名约定。比如:DC1, DC2 或 100, 200 和 RAC1, RAC2 或 R101, R102。选择名称要慎重,不要妄想再重命名数据中心。
  6. 操作
    6.1. 配置cassandra.yaml,cluster_name集群名称|-seeds种子节点IP|listen_address和rpc_address指定为节点IP
    6.2. 如下配置:
1
2
3
4
5
6
7
8
cluster_name: 'XXOCluster'
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
# Ex: ",,"
- seeds: "10.0.102.116,10.0.102.117"
listen_address: 10.0.102.116
rpc_address: 10.0.102.116
1
2
3
4
5
6
7
8
cluster_name: 'XXOCluster'
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
# Ex: ",,"
- seeds: "10.0.102.116,10.0.102.117"
listen_address: 10.0.102.117
rpc_address: 10.0.102.117
1
2
3
4
5
6
7
8
cluster_name: 'XXOCluster'
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
# Ex: ",,"
- seeds: "10.0.102.116,10.0.102.117"
listen_address: 10.0.102.118
rpc_address: 10.0.102.118

启动节点,优先启动种子节点,查看种子节点日志能看到其他节点和种子节点的一些通信。

启动非种子节点

image-20201029100126677

启动其它种子节点

image-20201029100021253

  1. bin/nodetool status查看集群信息
1
$ bin/nodetool status

image-20201029095326858

  1. 节点故障日志

image-20201029104108256

4、java连接

3.1 cassandra创建keyspace和table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ bin/cqlsh
cqlsh> CREATE KEYSPACE schema1
WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
cqlsh> USE schema1;
cqlsh:Schema1> CREATE TABLE users (
user_id varchar PRIMARY KEY,
first varchar,
last varchar,
age int
);
cqlsh:Schema1> INSERT INTO users (user_id, first, last, age)
VALUES ('jsmith', 'John', 'Smith', 42);
cqlsh:Schema1> SELECT * FROM users;
user_id | age | first | last
---------+-----+-------+-------
jsmith | 42 | john | smith
cqlsh:Schema1>

3.2 java springBoot连接

3.2.1 pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 
<dependency>
<groupId>com.datastax.cassandragroupId>
<artifactId>cassandra-driver-coreartifactId>
<version>3.8.0version>
dependency>

<dependency>
<groupId>com.datastax.cassandragroupId>
<artifactId>cassandra-driver-mappingartifactId>
<version>3.8.0version>
dependency>

<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-cassandraartifactId>
<version>2.2.5.RELEASEversion>
dependency>

3.2.2 application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
################# SERVER配置 #################
logging:
level:
com.funzzz: info #自己项目包名下的日志级别
spring:
application:
name: cassandra
data:
cassandra:
cluster-name: Test Cluster
keyspace-name: schema1
contact-points: 10.0.102.116
port: 9042
# username: backend
# password: backend
use:
cassandra: true

3.2.3 config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.funzzz.common.config;

import static com.datastax.driver.core.schemabuilder.SchemaBuilder.createKeyspace;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.DefaultNamingStrategy;
import com.datastax.driver.mapping.DefaultPropertyMapper;
import com.datastax.driver.mapping.MappingConfiguration;
import com.datastax.driver.mapping.MappingManager;
import com.datastax.driver.mapping.NamingConventions;
import com.datastax.driver.mapping.NamingStrategy;
import com.datastax.driver.mapping.PropertyMapper;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.cassandra.repository.config.EnableCassandraRepositories;

/**
* @author dongdong
* @date 2020-10-28 11:45
**/
@Slf4j
@Configuration
@ConditionalOnProperty(name = "use.cassandra", havingValue = "true")
@EnableCassandraRepositories(basePackages = "com.funzzz.mapper")
public class CassandraConfig {

@Value("${spring.data.cassandra.keyspace-name}")
private String keyspaceName;

@Value("${spring.data.cassandra.contact-points}")
private String host;

/**
* 端口
*/
@Value("${spring.data.cassandra.port}")
private int port;

/**
* 集群名称
*/
@Value("${spring.data.cassandra.cluster-name}")
private String clusterName;

/**
* 用户名
*/
@Value("${spring.data.cassandra.username:#{null}}")
private String userName;

/**
* 密码
*/
@Value("${spring.data.cassandra.password:#{null}}")
private String password;


@Bean
public Cluster cluster() {
return Cluster.builder()
.addContactPoints(host.split(","))
.withPort(port)
.withClusterName(clusterName)
.withCredentials(userName, password)
.withoutJMXReporting()
.build();
}

@Bean
public NamingStrategy namingStrategy() {
//java 驼峰命令, 对应cassandra选择a_a_b, 个人喜好^-^
return new DefaultNamingStrategy(NamingConventions.LOWER_CAMEL_CASE, NamingConventions.LOWER_SNAKE_CASE);
}

/**
* 根据Cluster 拿到session,并判断是否存在keyspace,不存在则创建新的
* @param cluster cluster
* @return session
*/

@Bean
public Session session(@Autowired Cluster cluster) {
Metadata metadata = cluster.getMetadata();
Session session = cluster.connect();
// 说明没有对应的keyspaceName,则新建
if (Objects.isNull(metadata.getKeyspace(keyspaceName))) {
setupKeyspace(session, keyspaceName);
}
session.execute("USE " + keyspaceName);
return session;
}

/**
* 如果keyspace不存在就创建新的keyspace
* @param session 根据驱动拿到连接的session
* @param keyspace keyspace
*/
private void setupKeyspace(@Autowired Session session, String keyspace) {
Map replication = new HashMap<>();
// 如果不存在创建新的keyspace
replication.put("class", "SimpleStrategy");
replication.put("replication_factor", 1);
session.execute(createKeyspace(keyspace).ifNotExists().with().replication(replication));
}

@Bean
public MappingManager mappingManager(Session session,@Autowired NamingStrategy namingStrategy) {
PropertyMapper propertyMapper = new DefaultPropertyMapper().setNamingStrategy(namingStrategy);
MappingConfiguration configuration = MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
MappingManager result = new MappingManager(session, configuration);
// CassandraHolder.MANAGER = result;
return result;
}

}

注:@EnableCassandraRepositories注解运行使用mybatis注解查询

3.2.4 Entity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.funzzz.model;

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import lombok.Data;

/**
* @author dongdong
* @date 2020-10-28 13:44
**/
@Data
@Table(name = "user", keyspace = "schema1")
public class User {
@PartitionKey
@Column(name = "id")
private String id;

@Column(name = "age")
private Integer age;

@Column(name = "first")
private String first;

@Column(name = "last")
private String last;
}

3.2.5 mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.funzzz.mapper;

import com.datastax.driver.mapping.annotations.Accessor;
import com.datastax.driver.mapping.annotations.Param;
import com.funzzz.model.User;
import java.util.List;
import org.springframework.data.cassandra.repository.CassandraRepository;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.stereotype.Repository;

/**
* @author dongdong
* @date 2020-11-04 19:12
**/
@Repository
@Accessor
public interface UserRepository extends CassandraRepository<User,String> {

/**
* 获取所有用户
* @return 用户列表
*/
@Query("select * from user")
List getAll();

/**
* 查看用户详情
* @param userId 用户id
* @return 用户信息
*/
@Query("select * from user where id = ?0")
List findAllByUserId(String userId);

/**
* 获取用户列表
* @param userIds 用户id列表
* @return 用户列表
*/
@Query("select * from user where id in ?0")
List findAllByUserIds(List userIds);

/**
* 插入一条数据
* @param id 用户id
* @param age 用户年龄
* @param first first name
* @param last last name
* @return 用户
*/
@Query("insert into schema1.user(id,age,first,last) values (:id, :age, :first, :last)")
User insertUser(@Param("id") String id, @Param("age") Integer age,@Param("first")String first,@Param("last")String last);

/**
* 更新用户信息
* @param id id
* @param age age
* @return 更新后用户信息
*/
@Query("UPDATE user SET age= :age WHERE id= :id")
User updateAge(@Param("id") String id, @Param("age") Integer age);

/**
* 删除一个用户
* @param userId 用户id
* @return 删除的用户信息
*/
@Query("DELETE FROM schema1.user WHERE id = :userId")
User deleteOne(@Param("userId") String userId);

/**
* 批量删除用户
* @param userIds 用户id列表
* @return 用户列表
*/
@Query("DELETE FROM schema1.user WHERE id in :userIds")
List batchDelete(@Param("userIds") List userIds);
}

3.2.6 service

1
2
3
public interface UserService extends BaseService<User,String> {
User updateAge( String id, Integer age);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.funzzz.service.impl;

import com.funzzz.mapper.UserRepository;
import com.funzzz.model.User;
import com.funzzz.service.BaseServiceImpl;
import com.funzzz.service.UserService;

import java.util.Arrays;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* @author dongdong
* @date 2020-10-28 13:47
**/
@Service
@Slf4j
public class UserServiceImpl extends BaseServiceImpl<UserRepository,User,String> implements UserService {

@Autowired
private UserRepository userRepository;

@Override
public String table() {
return "user";
}

@Override
public List column() {
return Arrays.asList("id","age","first","last");
}

@Override
public List index() {
return Arrays.asList("id","age");
}

@Override
public List bindData(User data) {
return Arrays.asList(data.getId(),data.getAge(),data.getFirst(),data.getLast());
}

@Override
public int deleteByIds(List strings) {
List users = userRepository.batchDelete(strings);
return users == null ? 0 : users.size();
}


@Override
public User updateAge(String id, Integer age) {
return userRepository.updateAge(id,age);
}
}

3.2.7 controller

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
@RequestMapping("api/v1/")
public class UserController {

@Resource
private UserService userService;

@RequestMapping("/user/list")
public List list() {
return userService.getAll();
}
}

3.2.8 application

1
2
3
4
5
6
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

因为之前习惯使用mybatis 通用mapper,所以对cassandra,基本操作也做了一层封装。

参考

1
2
3
4
5
6
7
8
9
https://www.iteblog.com/
https://blog.xiaoxiaomo.com/2017/11/21/Cassandra-%E7%94%A8%E6%88%B7%E5%AF%86%E7%A0%81%E8%AE%BE%E7%BD%AE/
https://www.codenong.com/36133127/
https://blog.csdn.net/vbirdbest/article/details/77802031
https://github.com/codefollower/CCUG/issues/7 #rpc_address 和 broadcast_rpc_address这两个参数的作用和配置问题
https://stackoverflow.com/questions/27215273/all-hosts-tried-for-query-failed
https://blog.csdn.net/cuixin20120511/article/details/104048396
https://blog.xiaoxiaomo.com/2017/11/16/Cassandra-%E6%90%AD%E5%BB%BA%E5%8D%95%E8%8A%82%E7%82%B9%E5%92%8C%E9%9B%86%E7%BE%A4/
https://www.cnblogs.com/Sungeek/p/12599552.html
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论