暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

Rust中使用RabbitMQ消息队列

325

使用RabbitMQ需要用实现了AMQP协议的客户端,lapin是Rust中一个实现了AMQP协议的客户端,这里使用lapin来连接RabbitMQ。关于AMQP协议,可以参考这篇文章一篇文章讲透彻了AMQP协议以及AMQP协议原文

工作过程

发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

rabbitmq.png

简单场景

对简单的场景,比如不需要路由,不需要交换机(采用默认exchange),只需要一个队列,可以参考如下示例:
image.png

发布者

  1. 连接到RabbitMQ服务器
  2. 创建消息通道
  3. 声明一个队列
  4. 发布消息
use log::{info, error}; use env_logger::Env; use std::time::Duration; use lapin::{Connection, ConnectionProperties, options::QueueDeclareOptions, options::BasicPublishOptions, types::FieldTable, BasicProperties, types::ReplyCode}; #[tokio::main] async fn main() -> Result<(), lapin::Error> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); info!("This is an demo of using rabbitmq amqp client in rust"); // connect to rabbitmq and send a message let amqp_url = "amqp://guest:guest@localhost:5672/%2F"; // Producer和Consumer客户端通过TCP连接到rabbitmq服务器 let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?; // 创建消息通道 let channel_a = conn.create_channel().await?; let channel_b = conn.create_channel().await?; // 声明一个队列 let queue = channel_a .queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()) .await?; info!("Declared queue {:?}", queue); for i in 1..100 { let body = format!("hello world a {}", i); // 发送消息 let _confirm = channel_a .basic_publish( "", "hello", BasicPublishOptions::default(), body.as_bytes(), BasicProperties::default(), ) .await?; info!("channel_a Publish {}", body); tokio::time::sleep(Duration::from_secs(1)).await; } for i in 1..100 { let body = format!("hello world b {}", i); // 发送消息 let _confirm = channel_b .basic_publish( "", "hello", BasicPublishOptions::default(), body.as_bytes(), BasicProperties::default(), ) .await?; info!("channel_b Publish {}", body); tokio::time::sleep(Duration::from_secs(1)).await; } channel_a.close(200, "OK").await?; channel_b.close(200, "OK").await?; conn.close(200, "OK").await?; info!("Connection closed"); Ok(()) }

消费者

  1. 连接到RabbitMQ服务器
  2. 创建消息通道
  3. 声明一个队列
  4. 接收消息
use log::{info, error}; use env_logger::Env; use std::time::Duration; use std::string::String; use lapin::{Connection, ConnectionProperties, Consumer, options::*, types::FieldTable, BasicProperties}; use futures_lite::stream::StreamExt; #[tokio::main] async fn main() -> Result<(), lapin::Error> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); info!("This is an demo of using rabbitmq amqp client in rust"); // connect to rabbitmq and send a message let amqp_url = "amqp://guest:guest@localhost:5672/%2F"; // Producer和Consumer客户端通过TCP连接到rabbitmq服务器 let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?; // 创建消息通道 let channel_a = conn.create_channel().await?; // 声明一个队列 let queue = channel_a .queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()) .await?; info!("Declared queue {:?}", queue); // 接收消息 let mut consumer = channel_a.basic_consume( "hello", "my_consumer", lapin::options::BasicConsumeOptions::default(), FieldTable::default(), ).await?; while let Some(delivery) = consumer.next().await { match delivery { Ok(delivery) => { let msg = String::from_utf8(delivery.data).unwrap(); info!("Received message: {:?}", msg); } Err(e) => { error!("Error receiving message: {:?}", e); } } } channel_a.close(200, "Bye").await?; conn.close(200, "Bye").await?; Ok(()) }

fanout路由场景

该类型路由规则非常简单,会把所有发送到该exchange的消息路由到所有与它绑定的Queue中,相当于广播功能。

发布者

  1. 连接到RabbitMQ服务器
  2. 创建消息通道
  3. 声明队列
  4. 创建交换机
  5. 绑定队列到交换机
  6. 发布消息
use log::*; use env_logger::Env; use lapin::{Connection, ConnectionProperties, options::*, types::FieldTable, BasicProperties, ExchangeKind}; #[tokio::main] async fn main() -> Result<(), lapin::Error> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); info!("This is an demo of using rabbitmq amqp client in rust"); // connect to rabbitmq and send a message let amqp_url = "amqp://guest:guest@localhost:5672/%2F"; // Producer和Consumer客户端通过TCP连接到rabbitmq服务器 let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?; // 创建消息通道 let channel = conn.create_channel().await?; let mut queueoptions = QueueDeclareOptions::default(); queueoptions.durable = true; queueoptions.auto_delete = true; // 声明一个队列 channel.queue_declare("worker_1", queueoptions, FieldTable::default()).await?; channel.queue_declare("worker_2", queueoptions, FieldTable::default()).await?; // 创建一个fanout类型的交换机 channel.exchange_declare("myexchange", ExchangeKind::Fanout, ExchangeDeclareOptions::default(), FieldTable::default()).await?; // 绑定队列到交换机 channel.queue_bind("worker_1", "myexchange", "", QueueBindOptions::default(), FieldTable::default()).await?; channel.queue_bind("worker_2", "myexchange", "", QueueBindOptions::default(), FieldTable::default()).await?; let payload = b"Hello, world!"; // 发送消息 let mut publish_options = BasicPublishOptions::default(); publish_options.mandatory = false; let properties = BasicProperties::default(); let _confirm = channel.basic_publish("myexchange", "", publish_options, payload, properties).await?; info!("publish message: {:?}", String::from_utf8_lossy(payload)); channel.close(200, "Bye").await?; Ok(()) }

消费者

  1. 连接到RabbitMQ服务器
  2. 创建消息通道
  3. 声明队列
  4. 创建交换机
  5. 绑定队列到交换机
  6. 接收消息
use log::*; use env_logger::Env; use lapin::{Connection, ConnectionProperties, options::*, types::*, BasicProperties, ExchangeKind}; use futures_lite::stream::StreamExt; use std::env; #[tokio::main] async fn main() -> Result<(), lapin::Error> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); let mut args = Vec::new(); for arg in env::args().skip(1) { args.push(arg); } info!("This is an demo of using rabbitmq amqp client in rust"); // connect to rabbitmq and send a message let amqp_url = "amqp://guest:guest@localhost:5672/%2F"; // Producer和Consumer客户端通过TCP连接到rabbitmq服务器 let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?; // 创建消息通道 let channel = conn.create_channel().await?; let mut queueoptions = QueueDeclareOptions::default(); queueoptions.durable = true; queueoptions.auto_delete = true; let queue_name = String::from("worker_") + args[0].as_str(); info!("Queue name: {}", queue_name); // 声明一个队列 channel.queue_declare(&queue_name, queueoptions, FieldTable::default()).await?; // 创建一个fanout类型的交换机 channel.exchange_declare("myexchange", ExchangeKind::Fanout, ExchangeDeclareOptions::default(), FieldTable::default()).await?; // 绑定队列到交换机 channel.queue_bind(&queue_name, "myexchange", "", QueueBindOptions::default(), FieldTable::default()).await?; let consumer_options = BasicConsumeOptions::default(); let properties = BasicProperties::default(); let consumer_tag = String::from("consumer_") + args[0].as_str(); // 消费者标签 // 消费消息 let mut consumer = channel.basic_consume(&queue_name, &consumer_tag, consumer_options, FieldTable::default()).await?; while let Some(delivery) = consumer.next().await { let delivery = delivery?; let body = delivery.data; let delivery_tag = delivery.delivery_tag; info!("Received message: {:?}", String::from_utf8(body).unwrap()); channel.basic_ack(delivery_tag, BasicAckOptions::default()).await?; } channel.close(200, "Bye").await?; Ok(()) }

direct路由场景

direct模式是RabbitMQ默认的exchange类型,该类型路由规则非常简单,消息会被路由到binding key与routing key完全匹配的Queue中。

发布者

  1. 连接到RabbitMQ服务器
  2. 创建消息通道
  3. 声明队列
  4. 创建交换机
  5. 绑定队列到交换机,指定路由键
  6. 发布消息,指定路由键, 注意与fanout的区别,fanout不用指定路由键
use log::*; use env_logger::Env; use lapin::{Connection, ConnectionProperties, options::*, types::FieldTable, BasicProperties, ExchangeKind}; #[tokio::main] async fn main() -> Result<(), lapin::Error> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); info!("This is an demo of using rabbitmq amqp client in rust"); // connect to rabbitmq and send a message let amqp_url = "amqp://guest:guest@localhost:5672/%2F"; // Producer和Consumer客户端通过TCP连接到rabbitmq服务器 let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?; // 创建消息通道 let channel = conn.create_channel().await?; let mut queueoptions = QueueDeclareOptions::default(); queueoptions.durable = true; queueoptions.auto_delete = true; // 声明一个队列 channel.queue_declare("worker_1", queueoptions, FieldTable::default()).await?; channel.queue_declare("worker_2", queueoptions, FieldTable::default()).await?; // 创建一个direct类型的交换机 channel.exchange_declare("exchange-direct", ExchangeKind::Direct, ExchangeDeclareOptions::default(), FieldTable::default()).await?; // 绑定队列到交换机,并指定路由键 channel.queue_bind("worker_1", "exchange-direct", "beijing", QueueBindOptions::default(), FieldTable::default()).await?; channel.queue_bind("worker_2", "exchange-direct", "hangzhou", QueueBindOptions::default(), FieldTable::default()).await?; let payload1 = b"Hello, beijing!"; // 发送消息 let payload2 = b"Hello, hangzhou!"; let mut publish_options = BasicPublishOptions::default(); publish_options.mandatory = false; let properties = BasicProperties::default(); // 发布消息时,需要指定交换机名称和路由键 let _confirm = channel.basic_publish("exchange-direct", "beijing", publish_options, payload1, properties.clone()).await?; info!("publish message: {:?}", String::from_utf8_lossy(payload1)); let _confirm = channel.basic_publish("exchange-direct", "hangzhou", publish_options, payload2, properties).await?; channel.close(200, "Bye").await?; Ok(()) }

消费者

  1. 连接到RabbitMQ服务器
  2. 创建消息通道
  3. 声明队列
  4. 创建direct类型交换机
  5. 绑定队列到交换机,指定路由键
  6. 接收消息
use log::*; use env_logger::Env; use lapin::{Connection, ConnectionProperties, options::*, types::FieldTable, BasicProperties, ExchangeKind}; use futures_lite::stream::StreamExt; use std::env; #[tokio::main] async fn main() -> Result<(), lapin::Error> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); info!("This is an demo of using rabbitmq amqp client in rust"); // 获取命令行参数, 第一个参数是队列ID,第二个参数是消费者ID let mut args = Vec::new(); for arg in env::args().skip(1) { args.push(arg); } let mut route_key = String::from(""); if args[0] == 1.to_string() { route_key = String::from("beijing"); } else if args[0] == 2.to_string() { route_key = String::from("hangzhou"); } else { panic!("Please input a valid queue ID"); } // connect to rabbitmq and send a message let amqp_url = "amqp://guest:guest@localhost:5672/%2F"; // Producer和Consumer客户端通过TCP连接到rabbitmq服务器 let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?; // 创建消息通道 let channel = conn.create_channel().await?; let mut queueoptions = QueueDeclareOptions::default(); queueoptions.durable = true; queueoptions.auto_delete = true; let queue_name = String::from("worker_") + args[0].as_str(); info!("Queue name: {}", queue_name); // 声明一个队列 channel.queue_declare(&queue_name, queueoptions, FieldTable::default()).await?; // 创建一个direct类型的交换机 channel.exchange_declare("exchange-direct", ExchangeKind::Direct, ExchangeDeclareOptions::default(), FieldTable::default()).await?; // 绑定队列到交换机,并指定路由键 channel.queue_bind(&queue_name, "exchange-direct", &route_key, QueueBindOptions::default(), FieldTable::default()).await?; let properties = BasicProperties::default(); let consumer_tag = String::from("consumer_") + args[1].as_str(); // 消费者标签 // 消费消息 let mut consumer = channel.basic_consume(&queue_name, &consumer_tag, BasicConsumeOptions::default(), FieldTable::default()).await?; while let Some(delivery) = consumer.next().await { let delivery = delivery?; let body = delivery.data; let delivery_tag = delivery.delivery_tag; info!("consumer {:?} received message: {:?} from queue {:?}", args[1], String::from_utf8(body).unwrap(), &queue_name); channel.basic_ack(delivery_tag, BasicAckOptions::default()).await?; } channel.close(200, "Bye").await?; Ok(()) }
最后修改时间:2025-05-06 11:33:27
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论