.NET RabbitMQ 实战指南

备份交换器

备份交换器,英文名称为 Alternate Exchange,简称 AE。通过在声明交换器(调用 channel.ExchangeDeclare 方法)时添加 alternate-exchange 参数来实现。

备份交换器工作流程如下:

.NET RabbitMQ 实战指南


using (var channel = connection.CreateModel())
{
	//设置备胎交换器参数
	var arguments = new Dictionary();
	arguments.Add("alternate-exchange","myAe");
	channel.ExchangeDeclare("normalExchange", "direct",true,false, arguments);
	channel.ExchangeDeclare("myAe", "fanout", true, false);
	channel.QueueDeclare("nromalQueue",true,false,false);
	channel.QueueBind("nromalQueue", "normalExchange", "normalKey");
	channel.QueueDeclare("unroutedQueue", true, false, false);
	channel.QueueBind("unroutedQueue", "myAe","ae");

	var properties = channel.CreateBasicProperties();
	properties.DeliveryMode = 2;
	string message = "RabbitMQ Test"; //传递的消息内容
	channel.BasicPublish("normalExchange", "normalKey", properties, Encoding.UTF8.GetBytes(message)); //生产消息
	channel.BasicPublish("normalExchange", "un-routkey", properties, Encoding.UTF8.GetBytes(message)); //生产消息

	Console.WriteLine($"Send:{message}");
}

代码中声明了两个交换器 normalExchange 和 myAe,分别绑定了 normalQueue 和 unroutedQueue 这两个队列,同时将 myAe 设置为 normalExchange 的备份交换器。myAe 的交换器类型为 fanout。同时生成两条消息,其中“un-routkey”并没有定义对应路由。

运行效果:

.NET RabbitMQ 实战指南
切换到 queues 视图,可以看到两个队列分别有一条需要消费的消息。

.NET RabbitMQ 实战指南

备份交换器与普通的交换器没有太大的区别,为了方便使用,建议设置为 fanout 类型

过期时间(TTL)

TTL,Time to Live 的简称,即过期时间。RabbitMQ 可以对队列和消息设置 TTL。

设置队列的 TTL

channel.QueueDeclare 方法中的 x-expires 参数可以设置队列被自动删除前处于未使用状态的时间。未使用是指队列上没有任何的消费者,队列也没有被重新声明过,并且其间内也未调用过 Basic.Get 命令。


var arguments = new Dictionary();
arguments.Add("x-expires", 10000);  //单位毫秒
channel.QueueDeclare(arguments: arguments);

RabbitMQ 重启后,持久化的队列的过期时间会被重新计算。

设置消息的 TTL

消息的过期时间有两种设置方式。第一种是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行设置。

如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值,就会变成“死信”(Dead Message)

通过队列属性设置消息 TTL 的方法是在 channel.QueueDeclare 方法中加入 x-message-ttl 参数实现的,这个参数的单位是毫秒。


arguments.Add("x-message-ttl", 6000);  //单位毫秒
channel.QueueDeclare(arguments: arguments);

对消息本身进行过期时间设置前面代码有涉及过,是通过 BasicPublish 方法的 basicProperties 参数指定。


var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
properties.Priority = 2;
properties.Expiration = "6000";
var message = "RabbitMQ Test"; //传递的消息内容
channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生产消息

死信队列

DLX,全称为 Dead-Letter-Exchange,即死信交换器。当一个消息在队列中变成死信(dead message),它就会被重新被发送到 DLX(死信交换器),绑定 DLX 的队列就称之为死信队列。

通过在 channel.QueueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加 DLX.

示例代码:


var arguments = new Dictionary();
arguments.Add("x-expires", 10000);  //单位毫秒
arguments.Add("x-message-ttl", 6000);  //单位毫秒
channel.QueueDeclare(arguments: arguments);

channel.ExchangeDeclare("dlx_exchange", "direct");
var argumentsDlx = new Dictionary();
argumentsDlx.Add("x-dead-letter-exchange", "dlx_exchange");
argumentsDlx.Add("x-dead-letter-routing-key", "dlx-routing-key"); //为 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键
channel.QueueDeclare("dlx_queue",false,false,false,argumentsDlx);

可以看到 dlx_queue 有标记了 DLX 和 DLK(Dead-Letter-Key)。

.NET RabbitMQ 实战指南

DLX 是一个非常有用的特性。异常情况下,消息不能够被消费者正确消费(消费者调用了 Basic.Nack 或者 Basic.Reject)而被置入死信队列中,后续分析程序可以通过死信队列中的内容来分析当时所遇到的异常情况,从而改善和优化系统。

延迟队列

延迟队列存储的是对应的延迟消息,“延迟消息”并不想让消费者立刻消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。如一个订单系统中,用户下单 30 分钟内没有完成支付那么就对该订单进行异常处理,就可以使用延迟队列来处理这些订单。

RabbitMQ 并支持延迟队列的功能,但是我们可以通过 DLX 和 TTL 模拟出延迟队列的功能。如下图所示:

.NET RabbitMQ 实战指南实例中设置了 5 秒、10 秒、30 秒、1 分钟四个延时等级。根据应用需求的不同,生产者在发送消息的时候设置不同的路由键,从而将消息发送到与交换器绑定的对应队列中。这些队列分别设置了过期时间为 5 秒、10 秒、30 秒、1 分钟,同时也分别配置了 DLX 和相应的死信队列。队列中消息过期时,就会转存到相应的死信队列(即延迟队列)中,在根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。

优先级队列

优先级队列,有高优先级的队列具有高的优先权,优先级高的消息有优先被消费的特权。

可以通过设置队列的 x-max-priority 参数来实现。并在发送消息时通过 Priority 设置消息优先级。


using (var channel = connection.CreateModel())
{

     channel.ExchangeDeclare("priority_exchange", "direct");
     var arguments = new Dictionary();
     arguments.Add("x-max-priority", 10);  //
     channel.QueueDeclare("priority_queue", false, false, false, arguments);
     channel.QueueBind("priority_queue", "priority_exchange", "priority_key");

     var properties = channel.CreateBasicProperties();
     properties.Priority = 2;
     string message = "Priority 2"; //传递的消息内容
     channel.BasicPublish("priority_exchange", "priority_key", properties, Encoding.UTF8.GetBytes(message)); //生产消息
     properties.Priority = 5;
     message = "Priority 5"; //传递的消息内容
     channel.BasicPublish("priority_exchange", "priority_key", properties, Encoding.UTF8.GetBytes(message)); //生产消息
     var result = channel.BasicGet("priority_queue", false);
     channel.BasicAck(result.DeliveryTag, true);
     Console.WriteLine($"Received:{Encoding.UTF8.GetString(result.Body.ToArray())}");

}

运行上面的代码,不管我们如何调整生成消息的顺序,channel.BasicGet 取出来的始终是 Priority 为 5 的那条消息。

.NET RabbitMQ 实战指南

Github

示例代码地址:https://github.com/MayueCif/RabbitMQ

© 版权声明

☆ END ☆
喜欢就点个赞吧
点赞0 分享
图片正在生成中,请稍后...