ScyllaDB是一个开源数据库,用于需要高性能和低延迟的数据密集型应用程序,非常适合Rust。与Rust编程语言和Tokio框架类似,ScyllaDB构建在异步、无阻塞的运行时上,适用于构建高度可靠的低延迟分布式应用程序。
ScyllaDB团队开发了scylla-rust-driver驱动程序,这是一个开源的ScyllaDB(和Apache Cassandra)Rust驱动程序。它是用纯Rust编写的,带有使用Tokio的完全异步API。您可以阅读更多关于其基准测试结果的信息,以及我们的开发人员如何解决性能回归问题。
我们最近开发了一个新的免费培训课程,介绍如何使用新的驱动程序与ScyllaDB集群进行交互。在本文中,我将介绍本课的基本部分,在本课中,您将构建一个简单的Rust应用程序,该应用程序将连接到一个ScyllaDB集群并执行基本查询。
为什么有Rust?
在我们深入学习新的Rust课程之前,让我们先解决一个显而易见的问题:为什么有Rust?
Rust是一种现代的、有表现力的语言,它越来越受欢迎,并被越来越广泛地使用。这是一种系统编程语言。然而,你几乎可以用它开发任何东西。它可以快速安全地运行,防止大多数崩溃,因为所有内存访问都已检查。它还消除了数据竞争。
此外,Rust还实现了一个独特而有趣的异步模型。也就是说,Rust的未来表示计算,向前推进这些异步计算的责任属于程序员。这允许以非常高效的方式创建异步程序,将分配需求降至最低,因为Rust的异步函数表示的状态机在编译时是已知的。
现在,进入新的Rust课程…
创建数据架构
我们课程中的示例Rust应用程序将能够存储和查询温度时间序列数据。每次测量将包含以下信息:
-
测量温度的传感器的传感器ID
-
测量温度的时间
-
温度值
首先,创建一个称为tutorial的键空间:
CREATE KEYSPACE IF NOT EXISTS tutorial
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
根据所需查询是特定设备在给定时间间隔内报告的温度,创建下表:
CREATE TABLE IF NOT EXISTS tutorial.temperature (
device UUID,
time timestamp,
temperature smallint,
PRIMARY KEY(device, time)
);
您正在构建的应用程序将能够查询给定设备在选定时间范围内测量的所有温度。这就是为什么您将使用以下选择查询:
SELECT * FROM tutorial.temperature
WHERE device = ?
AND time > ?
AND time < ?;
哪里将?替换为实际值:设备ID、开始时间和结束时间。
使用Rust连接到数据库
应用程序名称为温度,所需的依赖关系在Cargo中定义。toml文件:
uuid = {version = "0.8", features = ["v4"]}
tokio = {version = "1.1.0", features = ["full"]}
scylla = "0.3.1"
futures = "0.3.6"
chrono = "0.4.0"
哪里:
-
uuid–提供uuid的包。
-
tokio–提供异步运行时以在中执行数据库查询。
-
scylla——Rust ScyllaDB/Casandra驱动。
-
chrono–与时间一起工作的套装。
主功能使用tokio异步工作。以下内容确保返回结果:
#[tokio::main]
async fn main() -> Result<()> {
...
}
/src/db.rs将保留处理ScyllaDB实例的逻辑。第一步是建立数据库会话。
use scylla::{Session, SessionBuilder};
use crate::Result;
pub async fn create_session(uri: &str) -> Result<Session> {
SessionBuilder::new()
.known_node(uri)
.build()
.await
.map_err(From::from)
}
初始化会话:
#[tokio::main]
async fn main() -> Result<()> {
println!("connecting to db");
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = db::create_session(&uri).await?;
todo!()
}
请注意。在create_会话后等待。这是因为异步函数返回未来。可以在其他异步函数中等待未来,以获取其实际值,在本例中是Result<Session,Error>。最后一个问题是在等待之后,我们要确保如果我们从create_session返回一个错误而不是会话,错误将向上传播,应用程序将终止,并打印错误。
接下来是文件/src/db.rs定义了用于创建键空间和表以存储温度测量值的函数。您将使用查询创建键空间和表:
use scylla::{IntoTypedRows, Session, SessionBuilder};
use uuid::Uuid;
use crate::{Duration, Result, TemperatureMeasurement};
static CREATE_KEYSPACE_QUERY: &str = r#"
CREATE KEYSPACE IF NOT EXISTS tutorial
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
"#;
static CREATE_TEMPERATURE_TABLE_QUERY: &str = r#"
CREATE TABLE IF NOT EXISTS tutorial.temperature (
device UUID,
time timestamp,
temperature smallint,
PRIMARY KEY(device, time)
);
"#;
pub async fn initialize(session: &Session) -> Result<()> {
create_keyspace(session).await?;
create_temperature_table(session).await?;
Ok(())
}
async fn create_keyspace(session: &Session) -> Result<()> {
session
.query(CREATE_KEYSPACE_QUERY, ())
.await
.map(|_| ())
.map_err(From::from)
}
async fn create_temperature_table(session: &Session) -> Result<()> {
session
.query(CREATE_TEMPERATURE_TABLE_QUERY, ())
.await
.map(|_| ())
.map_err(From::from)
}
/src/db.rs文件定义插入查询。“、ScyllaDB将使用每个值替换?:
static ADD_MEASUREMENT_QUERY: &str = r#"
INSERT INTO tutorial.temperature (device, time, temperature)
VALUES (?, ?, ?);
"#;
pub async fn add_measurement(session: &Session, measurement: TemperatureMeasurement) -> Result<()> {
session
.query(ADD_MEASUREMENT_QUERY, measurement)
.await
.map(|_| ())
.map_err(From::from)
}
读取测量值
接下来,选择查询逻辑在/src/db.rs模块定义:
static SELECT_MEASUREMENTS_QUERY: &str = r#"
SELECT * FROM fast_logger.temperature
WHERE device = ?
AND time > ?
AND time < ?;
"#;
pub async fn select_measurements(
session: &Session,
device: Uuid,
time_from: Duration,
time_to: Duration,
) -> Result<Vec<TemperatureMeasurement>> {
session
.query(SELECT_MEASUREMENTS_QUERY, (device, time_from, time_to))
.await?
.rows
.unwrap_or_default()
.into_typed::<TemperatureMeasurement>()
.map(|v| v.map_err(From::from))
.collect()
}
重要步骤包括:
-
使用指定参数(设备ID、开始和结束日期)进行选择查询。
-
等待响应并将其转换为行。
-
行可能为空。unwrap_or_default默认值确保在这种情况下,您将获得一个空的Vec。
-
获得行后,使用into_typed:
()将每一行转换为,这将使用FromRow-derive。 -
由于into_typed返回一个结果,这意味着转换每个结果可能会失败。具有map(|v | v.map_err(From::From))确保每行的错误将转换为/src/result中定义的一般错误。卢比。
-
最后,collect将迭代值保存到向量中。
现在,回到/src/main.rs您可以看到其余的主要功能、导入和模块:
use uuid::Uuid;
use crate::duration::Duration;
use crate::result::Result;
use crate::temperature_measurement::TemperatureMeasurement;
mod db;
mod duration;
mod result;
mod temperature_measurement;
#[tokio::main]
async fn main() -> Result<()> {
println!("connecting to db");
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = db::create_session(&uri).await?;
db::initialize(&session).await?;
println!("Adding measurements");
let measurement = TemperatureMeasurement {
device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
time: Duration::seconds(1000000000001),
temperature: 40,
};
db::add_measurement(&session, measurement).await?;
let measurement = TemperatureMeasurement {
device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
time: Duration::seconds(1000000000003),
temperature: 60,
};
db::add_measurement(&session, measurement).await?;
println!("Selecting measurements");
let measurements = db::select_measurements(
&session,
Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
Duration::seconds(1000000000000),
Duration::seconds(10000000000009),
)
.await?;
println!(" >> Measurements: {:?}", measurements);
Ok(())
}
其他Rust学习机会
查看关于ScyllaDB大学的完整Rust教程,查看完整代码并自行运行示例。
原文标题:Tutorial: Build a Simple Rust App and Connect It to ScyllaDB NoSQL
原文作者:Guy Shtub
原文链接:https://dzone.com/articles/tutorial-build-a-simple-rust-app-and-connect-it-to




