暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

Rust使用PGMQ消息队列

188

PostgreSQL消息队列扩展PGMQ可以实现轻量级消息队列功能,提供了Rust库pgmq,我们一下在Rust中如何使用PGMQ消息队列。

pgmq库源码解析

实现思路就是封装SQL接口,实现队列的创建、删除、消息发送、消息读取、消息删除等操作。

#[derive(Clone, Debug)] pub struct PGMQueueExt { pub url: String, // 数据库连接url pub connection: Pool<Postgres>, // 数据库连接池 } // PGMQ消息队列元数据 pub struct PGMQueueMeta { pub queue_name: String, // 队列名称 pub created_at: chrono::DateTime<Utc>, // 队列创建时间 pub is_unlogged: bool, // 是否为unlogged表 pub is_partitioned: bool, // 是否为分区表 } impl PGMQueueExt { // 连接到PostgreSQL数据库 pub async fn new(url: String, max_connections: u32) -> Result<Self, PgmqError> { Ok(Self { connection: connect(&url, max_connections).await?, url, }) } // 调用SQL接口pgmq.create创建队列 pub async fn create_with_cxn<'c, E: sqlx::Executor<'c, Database = Postgres>>( &self, queue_name: &str, executor: E, ) -> Result<bool, PgmqError> { check_input(queue_name)?; sqlx::query!( "SELECT * from pgmq.create(queue_name=>$1::text);", queue_name ) .execute(executor) .await?; Ok(true) } // Errors when there is any database error and Ok(false) when the queue already exists. pub async fn create(&self, queue_name: &str) -> Result<bool, PgmqError> { self.create_with_cxn(queue_name, &self.connection).await?; Ok(true) } // ... 省略其他方法,也都是调用SQL接口实现消息发送、读取、删除等操作 }

具体示例

我们以发布消息和消费消息的代码作为示例,看一下如何发布消息,如何读取消息。

发布者
use pgmq::{Message, PGMQueueExt}; use log::{info, error}; use env_logger::Env; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use std::time::Duration; use chrono::{Local, DateTime}; #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] struct MyMessage { id: i32, content: String, } #[tokio::main] async fn main() { env_logger::Builder::from_env(Env::default().default_filter_or("info")) .init(); info!("rust publisher exapmle of pgmq"); let dburl = "postgres://postgres:postgres@localhost:5432/postgres"; let pool = PgPool::connect(dburl).await.expect("Failed to connect to the database"); let queue = PGMQueueExt::new_with_pool(pool).await; let qname = "myqueue"; queue.create(&qname).await.expect("Failed to create queue"); // 发布消息 for i in 1..1000 { let now: DateTime<Local> = Local::now(); let timestr = now.format("%Y-%m-%d %H:%M:%S").to_string(); let msg = MyMessage { id: i, content: timestr, }; let send = queue.send(&qname, &msg).await; match send { Ok(_) => info!("Message sent {} successfully", i), Err(e) => error!("Failed to send message: {}", e), } tokio::time::sleep(Duration::from_millis(100)).await; } info!("1000 messages sent successfully"); }
消费者
use pgmq::{Message, PGMQueueExt, PgmqError}; use log::{info, error}; use env_logger::Env; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use std::time::Duration; #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] struct MyMessage { id: i32, content: String, } #[tokio::main] async fn main() { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); info!("rust consumer exapmle of pgmq"); let dburl = "postgres://postgres:postgres@localhost:5432/postgres"; let pool = PgPool::connect(dburl).await.expect("Failed to connect to the database"); let queue = PGMQueueExt::new_with_pool(pool).await; let qname = "myqueue"; queue.create(&qname).await.expect("Failed to create queue"); // 消费消息 loop { let rev: Result<Option<Vec<Message<MyMessage>>>, PgmqError> = queue.read_batch_with_poll(&qname,3,5,Some(Duration::from_secs(1)), Some(Duration::from_millis(100))).await; match rev { Ok(messages) => { if let Some(msgs) = messages { for msg in msgs { info!("Received message: {:?}", msg); let i = msg.msg_id; queue.delete(&qname, i).await.expect("Failed to delete message"); info!("Deleted message with ID: {}", i); } } else { info!("No messages received, sleeping for a while..."); tokio::time::sleep(Duration::from_millis(200)).await; } }, Err(err) => { error!("Error reading messages: {}", err); } } } }
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论