PostgreSQL消息队列扩展PGMQ可以实现轻量级消息队列功能,提供了Rust库pgmq,我们一下在Rust中如何使用PGMQ消息队列。
pgmq库源码解析
实现思路就是封装SQL接口,实现队列的创建、删除、消息发送、消息读取、消息删除等操作。
#[derive(Clone, Debug)]
pub struct PGMQueueExt {
pub url: String, // 数据库连接url
pub connection: Pool<Postgres>, // 数据库连接池
}
// PGMQ消息队列元数据
pub struct PGMQueueMeta {
pub queue_name: String, // 队列名称
pub created_at: chrono::DateTime<Utc>, // 队列创建时间
pub is_unlogged: bool, // 是否为unlogged表
pub is_partitioned: bool, // 是否为分区表
}
impl PGMQueueExt {
// 连接到PostgreSQL数据库
pub async fn new(url: String, max_connections: u32) -> Result<Self, PgmqError> {
Ok(Self {
connection: connect(&url, max_connections).await?,
url,
})
}
// 调用SQL接口pgmq.create创建队列
pub async fn create_with_cxn<'c, E: sqlx::Executor<'c, Database = Postgres>>(
&self,
queue_name: &str,
executor: E,
) -> Result<bool, PgmqError> {
check_input(queue_name)?;
sqlx::query!(
"SELECT * from pgmq.create(queue_name=>$1::text);",
queue_name
)
.execute(executor)
.await?;
Ok(true)
}
// Errors when there is any database error and Ok(false) when the queue already exists.
pub async fn create(&self, queue_name: &str) -> Result<bool, PgmqError> {
self.create_with_cxn(queue_name, &self.connection).await?;
Ok(true)
}
// ... 省略其他方法,也都是调用SQL接口实现消息发送、读取、删除等操作
}
具体示例
我们以发布消息和消费消息的代码作为示例,看一下如何发布消息,如何读取消息。
发布者
use pgmq::{Message, PGMQueueExt};
use log::{info, error};
use env_logger::Env;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::time::Duration;
use chrono::{Local, DateTime};
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
struct MyMessage {
id: i32,
content: String,
}
#[tokio::main]
async fn main() {
env_logger::Builder::from_env(Env::default().default_filter_or("info"))
.init();
info!("rust publisher exapmle of pgmq");
let dburl = "postgres://postgres:postgres@localhost:5432/postgres";
let pool = PgPool::connect(dburl).await.expect("Failed to connect to the database");
let queue = PGMQueueExt::new_with_pool(pool).await;
let qname = "myqueue";
queue.create(&qname).await.expect("Failed to create queue");
// 发布消息
for i in 1..1000 {
let now: DateTime<Local> = Local::now();
let timestr = now.format("%Y-%m-%d %H:%M:%S").to_string();
let msg = MyMessage {
id: i,
content: timestr,
};
let send = queue.send(&qname, &msg).await;
match send {
Ok(_) => info!("Message sent {} successfully", i),
Err(e) => error!("Failed to send message: {}", e),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
info!("1000 messages sent successfully");
}
消费者
use pgmq::{Message, PGMQueueExt, PgmqError};
use log::{info, error};
use env_logger::Env;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
struct MyMessage {
id: i32,
content: String,
}
#[tokio::main]
async fn main() {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
info!("rust consumer exapmle of pgmq");
let dburl = "postgres://postgres:postgres@localhost:5432/postgres";
let pool = PgPool::connect(dburl).await.expect("Failed to connect to the database");
let queue = PGMQueueExt::new_with_pool(pool).await;
let qname = "myqueue";
queue.create(&qname).await.expect("Failed to create queue");
// 消费消息
loop {
let rev: Result<Option<Vec<Message<MyMessage>>>, PgmqError> = queue.read_batch_with_poll(&qname,3,5,Some(Duration::from_secs(1)), Some(Duration::from_millis(100))).await;
match rev {
Ok(messages) => {
if let Some(msgs) = messages {
for msg in msgs {
info!("Received message: {:?}", msg);
let i = msg.msg_id;
queue.delete(&qname, i).await.expect("Failed to delete message");
info!("Deleted message with ID: {}", i);
}
} else {
info!("No messages received, sleeping for a while...");
tokio::time::sleep(Duration::from_millis(200)).await;
}
},
Err(err) => {
error!("Error reading messages: {}", err);
}
}
}
}
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




