使用RabbitMQ需要用实现了AMQP协议的客户端,lapin是Rust中一个实现了AMQP协议的客户端,这里使用lapin来连接RabbitMQ。关于AMQP协议,可以参考这篇文章一篇文章讲透彻了AMQP协议以及AMQP协议原文。
工作过程
发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

简单场景
对简单的场景,比如不需要路由,不需要交换机(采用默认exchange),只需要一个队列,可以参考如下示例:

发布者
- 连接到RabbitMQ服务器
- 创建消息通道
- 声明一个队列
- 发布消息
use log::{info, error};
use env_logger::Env;
use std::time::Duration;
use lapin::{Connection, ConnectionProperties,
options::QueueDeclareOptions, options::BasicPublishOptions, types::FieldTable,
BasicProperties, types::ReplyCode};
#[tokio::main]
async fn main() -> Result<(), lapin::Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
info!("This is an demo of using rabbitmq amqp client in rust");
// connect to rabbitmq and send a message
let amqp_url = "amqp://guest:guest@localhost:5672/%2F";
// Producer和Consumer客户端通过TCP连接到rabbitmq服务器
let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?;
// 创建消息通道
let channel_a = conn.create_channel().await?;
let channel_b = conn.create_channel().await?;
// 声明一个队列
let queue = channel_a
.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default())
.await?;
info!("Declared queue {:?}", queue);
for i in 1..100 {
let body = format!("hello world a {}", i);
// 发送消息
let _confirm = channel_a
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
body.as_bytes(),
BasicProperties::default(),
)
.await?;
info!("channel_a Publish {}", body);
tokio::time::sleep(Duration::from_secs(1)).await;
}
for i in 1..100 {
let body = format!("hello world b {}", i);
// 发送消息
let _confirm = channel_b
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
body.as_bytes(),
BasicProperties::default(),
)
.await?;
info!("channel_b Publish {}", body);
tokio::time::sleep(Duration::from_secs(1)).await;
}
channel_a.close(200, "OK").await?;
channel_b.close(200, "OK").await?;
conn.close(200, "OK").await?;
info!("Connection closed");
Ok(())
}
消费者
- 连接到RabbitMQ服务器
- 创建消息通道
- 声明一个队列
- 接收消息
use log::{info, error};
use env_logger::Env;
use std::time::Duration;
use std::string::String;
use lapin::{Connection, ConnectionProperties, Consumer,
options::*, types::FieldTable,
BasicProperties};
use futures_lite::stream::StreamExt;
#[tokio::main]
async fn main() -> Result<(), lapin::Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
info!("This is an demo of using rabbitmq amqp client in rust");
// connect to rabbitmq and send a message
let amqp_url = "amqp://guest:guest@localhost:5672/%2F";
// Producer和Consumer客户端通过TCP连接到rabbitmq服务器
let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?;
// 创建消息通道
let channel_a = conn.create_channel().await?;
// 声明一个队列
let queue = channel_a
.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default())
.await?;
info!("Declared queue {:?}", queue);
// 接收消息
let mut consumer = channel_a.basic_consume(
"hello",
"my_consumer",
lapin::options::BasicConsumeOptions::default(),
FieldTable::default(),
).await?;
while let Some(delivery) = consumer.next().await {
match delivery {
Ok(delivery) => {
let msg = String::from_utf8(delivery.data).unwrap();
info!("Received message: {:?}", msg);
}
Err(e) => {
error!("Error receiving message: {:?}", e);
}
}
}
channel_a.close(200, "Bye").await?;
conn.close(200, "Bye").await?;
Ok(())
}
fanout路由场景
该类型路由规则非常简单,会把所有发送到该exchange的消息路由到所有与它绑定的Queue中,相当于广播功能。
发布者
- 连接到RabbitMQ服务器
- 创建消息通道
- 声明队列
- 创建交换机
- 绑定队列到交换机
- 发布消息
use log::*;
use env_logger::Env;
use lapin::{Connection, ConnectionProperties,
options::*, types::FieldTable,
BasicProperties, ExchangeKind};
#[tokio::main]
async fn main() -> Result<(), lapin::Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
info!("This is an demo of using rabbitmq amqp client in rust");
// connect to rabbitmq and send a message
let amqp_url = "amqp://guest:guest@localhost:5672/%2F";
// Producer和Consumer客户端通过TCP连接到rabbitmq服务器
let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?;
// 创建消息通道
let channel = conn.create_channel().await?;
let mut queueoptions = QueueDeclareOptions::default();
queueoptions.durable = true;
queueoptions.auto_delete = true;
// 声明一个队列
channel.queue_declare("worker_1", queueoptions, FieldTable::default()).await?;
channel.queue_declare("worker_2", queueoptions, FieldTable::default()).await?;
// 创建一个fanout类型的交换机
channel.exchange_declare("myexchange", ExchangeKind::Fanout, ExchangeDeclareOptions::default(), FieldTable::default()).await?;
// 绑定队列到交换机
channel.queue_bind("worker_1", "myexchange", "", QueueBindOptions::default(), FieldTable::default()).await?;
channel.queue_bind("worker_2", "myexchange", "", QueueBindOptions::default(), FieldTable::default()).await?;
let payload = b"Hello, world!"; // 发送消息
let mut publish_options = BasicPublishOptions::default();
publish_options.mandatory = false;
let properties = BasicProperties::default();
let _confirm = channel.basic_publish("myexchange", "", publish_options, payload, properties).await?;
info!("publish message: {:?}", String::from_utf8_lossy(payload));
channel.close(200, "Bye").await?;
Ok(())
}
消费者
- 连接到RabbitMQ服务器
- 创建消息通道
- 声明队列
- 创建交换机
- 绑定队列到交换机
- 接收消息
use log::*;
use env_logger::Env;
use lapin::{Connection, ConnectionProperties,
options::*, types::*,
BasicProperties, ExchangeKind};
use futures_lite::stream::StreamExt;
use std::env;
#[tokio::main]
async fn main() -> Result<(), lapin::Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let mut args = Vec::new();
for arg in env::args().skip(1) {
args.push(arg);
}
info!("This is an demo of using rabbitmq amqp client in rust");
// connect to rabbitmq and send a message
let amqp_url = "amqp://guest:guest@localhost:5672/%2F";
// Producer和Consumer客户端通过TCP连接到rabbitmq服务器
let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?;
// 创建消息通道
let channel = conn.create_channel().await?;
let mut queueoptions = QueueDeclareOptions::default();
queueoptions.durable = true;
queueoptions.auto_delete = true;
let queue_name = String::from("worker_") + args[0].as_str();
info!("Queue name: {}", queue_name);
// 声明一个队列
channel.queue_declare(&queue_name, queueoptions, FieldTable::default()).await?;
// 创建一个fanout类型的交换机
channel.exchange_declare("myexchange", ExchangeKind::Fanout, ExchangeDeclareOptions::default(), FieldTable::default()).await?;
// 绑定队列到交换机
channel.queue_bind(&queue_name, "myexchange", "", QueueBindOptions::default(), FieldTable::default()).await?;
let consumer_options = BasicConsumeOptions::default();
let properties = BasicProperties::default();
let consumer_tag = String::from("consumer_") + args[0].as_str(); // 消费者标签
// 消费消息
let mut consumer = channel.basic_consume(&queue_name, &consumer_tag, consumer_options, FieldTable::default()).await?;
while let Some(delivery) = consumer.next().await {
let delivery = delivery?;
let body = delivery.data;
let delivery_tag = delivery.delivery_tag;
info!("Received message: {:?}", String::from_utf8(body).unwrap());
channel.basic_ack(delivery_tag, BasicAckOptions::default()).await?;
}
channel.close(200, "Bye").await?;
Ok(())
}
direct路由场景
direct模式是RabbitMQ默认的exchange类型,该类型路由规则非常简单,消息会被路由到binding key与routing key完全匹配的Queue中。
发布者
- 连接到RabbitMQ服务器
- 创建消息通道
- 声明队列
- 创建交换机
- 绑定队列到交换机,指定路由键
- 发布消息,指定路由键, 注意与fanout的区别,fanout不用指定路由键
use log::*;
use env_logger::Env;
use lapin::{Connection, ConnectionProperties,
options::*, types::FieldTable,
BasicProperties, ExchangeKind};
#[tokio::main]
async fn main() -> Result<(), lapin::Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
info!("This is an demo of using rabbitmq amqp client in rust");
// connect to rabbitmq and send a message
let amqp_url = "amqp://guest:guest@localhost:5672/%2F";
// Producer和Consumer客户端通过TCP连接到rabbitmq服务器
let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?;
// 创建消息通道
let channel = conn.create_channel().await?;
let mut queueoptions = QueueDeclareOptions::default();
queueoptions.durable = true;
queueoptions.auto_delete = true;
// 声明一个队列
channel.queue_declare("worker_1", queueoptions, FieldTable::default()).await?;
channel.queue_declare("worker_2", queueoptions, FieldTable::default()).await?;
// 创建一个direct类型的交换机
channel.exchange_declare("exchange-direct", ExchangeKind::Direct, ExchangeDeclareOptions::default(), FieldTable::default()).await?;
// 绑定队列到交换机,并指定路由键
channel.queue_bind("worker_1", "exchange-direct", "beijing", QueueBindOptions::default(), FieldTable::default()).await?;
channel.queue_bind("worker_2", "exchange-direct", "hangzhou", QueueBindOptions::default(), FieldTable::default()).await?;
let payload1 = b"Hello, beijing!"; // 发送消息
let payload2 = b"Hello, hangzhou!";
let mut publish_options = BasicPublishOptions::default();
publish_options.mandatory = false;
let properties = BasicProperties::default();
// 发布消息时,需要指定交换机名称和路由键
let _confirm = channel.basic_publish("exchange-direct", "beijing", publish_options, payload1, properties.clone()).await?;
info!("publish message: {:?}", String::from_utf8_lossy(payload1));
let _confirm = channel.basic_publish("exchange-direct", "hangzhou", publish_options, payload2, properties).await?;
channel.close(200, "Bye").await?;
Ok(())
}
消费者
- 连接到RabbitMQ服务器
- 创建消息通道
- 声明队列
- 创建direct类型交换机
- 绑定队列到交换机,指定路由键
- 接收消息
use log::*;
use env_logger::Env;
use lapin::{Connection, ConnectionProperties,
options::*, types::FieldTable,
BasicProperties, ExchangeKind};
use futures_lite::stream::StreamExt;
use std::env;
#[tokio::main]
async fn main() -> Result<(), lapin::Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
info!("This is an demo of using rabbitmq amqp client in rust");
// 获取命令行参数, 第一个参数是队列ID,第二个参数是消费者ID
let mut args = Vec::new();
for arg in env::args().skip(1) {
args.push(arg);
}
let mut route_key = String::from("");
if args[0] == 1.to_string() {
route_key = String::from("beijing");
} else if args[0] == 2.to_string() {
route_key = String::from("hangzhou");
} else {
panic!("Please input a valid queue ID");
}
// connect to rabbitmq and send a message
let amqp_url = "amqp://guest:guest@localhost:5672/%2F";
// Producer和Consumer客户端通过TCP连接到rabbitmq服务器
let conn = Connection::connect(amqp_url, ConnectionProperties::default()).await?;
// 创建消息通道
let channel = conn.create_channel().await?;
let mut queueoptions = QueueDeclareOptions::default();
queueoptions.durable = true;
queueoptions.auto_delete = true;
let queue_name = String::from("worker_") + args[0].as_str();
info!("Queue name: {}", queue_name);
// 声明一个队列
channel.queue_declare(&queue_name, queueoptions, FieldTable::default()).await?;
// 创建一个direct类型的交换机
channel.exchange_declare("exchange-direct", ExchangeKind::Direct, ExchangeDeclareOptions::default(), FieldTable::default()).await?;
// 绑定队列到交换机,并指定路由键
channel.queue_bind(&queue_name, "exchange-direct", &route_key, QueueBindOptions::default(), FieldTable::default()).await?;
let properties = BasicProperties::default();
let consumer_tag = String::from("consumer_") + args[1].as_str(); // 消费者标签
// 消费消息
let mut consumer = channel.basic_consume(&queue_name, &consumer_tag, BasicConsumeOptions::default(), FieldTable::default()).await?;
while let Some(delivery) = consumer.next().await {
let delivery = delivery?;
let body = delivery.data;
let delivery_tag = delivery.delivery_tag;
info!("consumer {:?} received message: {:?} from queue {:?}", args[1], String::from_utf8(body).unwrap(), &queue_name);
channel.basic_ack(delivery_tag, BasicAckOptions::default()).await?;
}
channel.close(200, "Bye").await?;
Ok(())
}
最后修改时间:2025-05-06 11:33:27
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




