c#消息队列实际应用(消息队列和消息中间件的关系)
在消息队列模型中,如何将消息广播到所有的消费者,这种模式称为“发布/订阅”。本文主要以一个简单的小例子,简述通过fanout交换机,实现消息的发布与订阅,仅供学习分享使用,如有不足之处,还请指正。
Fanout交换机模型
扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
RabbitMQ控制台操作
新增两个队列
在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:
绑定fanout交换机
将两个队列绑定到系统默认的fanout交换机,如下所示:
示例效果图
生产者,采用Fanout类型交换机发布消息,如下图所示:
当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:
当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:
核心代码
消息发布
建立连接后,将通道声明类型为Fanout的交换机,如下所示:
1 /// <summary> 2 /// fanout类型交换机,发送消息 3 /// </summary> 4 public class RabbitMqFanoutSendHelper : RabbitMqHelper { 5 /// <summary> 6 /// 发送消息 7 /// </summary> 8 /// <param name="msg"></param> 9 /// <returns></returns>10 public bool SendMsg(string msg)11 {12 try13 {14 using (var conn = GetConnection("/Alan.hsiang"))15 {16 using (var channel = conn.CreateModel())17 {18 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);19 20 var body = Encoding.UTF8.GetBytes(msg);21 22 channel.BasicPublish(exchange: "amq.fanout",23 routingKey: ",24 basicProperties: null,25 body: body);26 27 //Console.WriteLine(" [x] Sent {0}", message);28 };29 };30 return true;31 }32 catch (Exception ex)33 {34 throw ex;35 }36 }37 }
消息订阅
建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:
1 /// <summary> 2 /// 扇形交换机接收消息 3 /// </summary> 4 public class RabbitMqFanoutReceiveHelper : RabbitMqHelper 5 { 6 public RabbitMqReceiveEventHandler OnReceiveEvent; 7 8 private IConnection conn; 9 10 private IModel channel;11 12 private EventingBasicConsumer consumer;13 14 public bool StartReceiveMsg(string queueName)15 {16 try17 {18 conn = GetConnection("/Alan.hsiang");19 20 channel = conn.CreateModel();21 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);22 //此处随机取出交换机下的队列23 //var queueName = channel.QueueDeclare().QueueName;24 channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: ");25 consumer = new EventingBasicConsumer(channel);26 consumer.Received += (model, ea) =>27 {28 var body = ea.Body.ToArray();29 var message = Encoding.UTF8.GetString(body);30 //Console.WriteLine(" [x] Received {0}", message);31 if (OnReceiveEvent != null)32 {33 OnReceiveEvent(queueName+"::"+message);34 }35 };36 channel.BasicConsume(queue: queueName,37 autoAck: true,38 consumer: consumer);39 return true;40 }41 catch (Exception ex)42 {43 throw ex;44 }45 }46 }
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com