RabbitMQ简单来说就是一个消息队列的服务器软件,支持多种消息传递协议,消息排队,传递确认,到队列的灵活路由,多种交换类型。
RabbitMQ如何安装在前面已经写过,就不再重复,不清楚可以翻阅以下文章
部署完成RabbitMQ后,我们开启了 web 管理界面,从界面上来直观认识RabbitMQ的功能。
登录 web 管理界面后,我们可以看到有 6 个 tab,其中 overview 总览界面和 admin 用户管理界面可以暂时不理会。
其中,Connections 是RabbitMQ的连接,显示连接上RabbitMQ服务器的客户端连接。Channels 是显示客户端指定的频道,然后客户端在频道上去创建 Exchange 和 Queues,并将 Exchange 和 Queues 绑定在一起,使发起方和接收方能在特定的规则上收发消息。
我们现在通过.NETCORE 代码来直观感受一波操作,这里使用生产者消费者模式来展示。
首先我们创建一个.NETCORE 控制台 RabbitMQProducer,并通过 NUGET 管理器下载安装 RabbitMQ.Client(RabbitMQ 官方提供)
通过控制台,我们创建 Connections,Channels,Exchange 和 Queues,并将Exchange 和 Queues 绑定,通过调试我们可以看到以下效果
看起来我们已经把路造起来了,现在我们来搞个消息传递一下看看
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabbitMQProducer
{
class Program
{
static void Main(string[] args)
{
//通过连接工厂创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "10.5.10.107";//RabbitMQ 服务器地址
factory.UserName = "test";//用户名
factory.Password = "test";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建一个频道;
using (IModel channel = connection.CreateModel())
{
//创建队列
channel.QueueDeclare(queue: "Producer", durable: true, exclusive: false, autoDelete: false, arguments: null);
//创建交换机
channel.ExchangeDeclare(exchange: "ProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//交换机和队列绑定
channel.QueueBind(queue: "Producer", exchange: "ProducerExChange", routingKey: "xzdjs", arguments: null);
Console.WriteLine($"客户端生产者已创建完毕!");
//发出 10 个消息到队列中去
for (int i = 0; i < 10; i++)
{
//这是要发送的消息
string msg = $"我该怎么和你解释我只是一条咸鱼 _{i + 1}";
//通过频道发送出去
channel.BasicPublish(exchange: "ProducerExChange",routingKey: "xzdjs",basicProperties: null, body: Encoding.UTF8.GetBytes(msg));
Console.WriteLine($"{msg} 已发送");
}
}
}
}
}
}
到这里,我们就把 10 条消息丢到了 RabbitMQ 服务器上了,那么如何把这些消息读取出来呢?
我们再创建一个 RabbitMQConsumer 控制台,同样安装 RabbitMQ.Client
然后前面的 Connections,Channels,Exchange 和 Queues,将 Exchange 和 Queues 绑定的代码是一样的,只要接到一样的路上,才能获取到消息。不同的是生产者是发送,消费者是接收。
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQConsumer
{
class Program
{
static void Main(string[] args)
{
//为了便于区分,这里设置一下字体颜色
Console.ForegroundColor = ConsoleColor.Green;
//通过连接工厂创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "10.5.10.107";//RabbitMQ 服务器地址
factory.UserName = "test";//用户名
factory.Password = "test";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建一个频道;
using (IModel channel = connection.CreateModel())
{
//创建队列
channel.QueueDeclare(queue: "Producer", durable: true, exclusive: false, autoDelete: false, arguments: null);
//创建交换机
channel.ExchangeDeclare(exchange: "ProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//交换机和队列绑定
channel.QueueBind(queue: "Producer", exchange: "ProducerExChange", routingKey: "xzdjs", arguments: null);
Console.WriteLine($"客户端消费者已创建完毕!");
//RabbitMQ 消费消息是通过事件驱动的:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => //如果有消息进入到 RabbitMQ,就会触发这个事件来完成消息的消费;
{
var body = ea.Body;
var msg = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接受消息: {msg}");
};
channel.BasicConsume(queue: "Producer",autoAck: true,consumer: consumer);
Console.ReadLine();
}
}
}
}
}
获取后,RabbitMQ服务器上相应队列里的消息就没有了。
本次就简单认识一下 RabbitMQ 的队列消息是如何在.NETCORE 上运行的。后续再对队列的优先级、传递确认、灵活路由、Exchange 的四种类型进行展开演示。