RabbitMQ (三) 簡單實作一個MQ

前言

本篇利用RabbitMQ client來簡單實現MQ功能.

在RabbitMQ中有很重要兩個角色,ProducerConsumer,下面這個範例使用c# console來實現.

我個人覺得APMQ協議和Http協議有點類似,一樣有Header(Properties),body…等等概念.

Producer Code

一開始我們宣告一個 ConnectionFactory 並設置RabbitMQ Server連接參數

  • UserName:使用者帳號
  • Password:使用者密碼
  • HostName:連接FQDN或IP

RabbitMQ預設密碼是 guest

//建立連接工廠
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

呼叫factory.CreateConnection 建立連接RabbitMQ連接物件,並呼叫 CreateModel方法建立一個channel Model

在之前有說過RabbitMq會利用同一個Connection來建立不同的channel來執行MQ連接.

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    #region 如果在RabbitMq手動建立可以忽略這段程式
    //建立一個Queue
    channel.QueueDeclare(queueName, false, false, false, null);
    //建立一個Exchange
    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
    //把Queue跟Exchange
    channel.QueueBind(queueName, exchangeName, routeKey); 
    #endregion

    Console.WriteLine("\nRabbitMQ連接成功,如需離開請按下Escape鍵");

    string input = string.Empty;
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發布訊息到RabbitMQ Server
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (Console.ReadKey().Key != ConsoleKey.Escape);
}

最後在使用 channel.BasicPublish方法 將訊息推送給指定交換器,因為是走tcp所以將訊息轉換成二進制流


Consumer Code

前面建立連接都大同小異都是利用ConnectionFactory來建立連接物件

ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

string exchangeName = "exchangeFanout";
string queueName = "FanoutQueue";
string routeKey = string.Empty;

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    //channel.QueueBind
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    channel.BasicQos(0, 1, false);
    //接收到消息事件 consumer.IsRunning
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body);

        Console.WriteLine($"Queue:{queueName}收到資料: {message}");
        channel.BasicAck(ea.DeliveryTag, false);
    };

    channel.BasicConsume(queueName, false, consumer); 
    Console.WriteLine("接收訊息");
    Console.ReadKey();
}

值得一提的是 EventingBasicConsumer 這個類別有一個建構子函數,把 channel 物件傳入產生一個消費者

EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

在呼叫 EventingBasicConsumer.Received綁定接收訊息事件,

  1. 第一個參數是channel物件本身
  2. 第二個參數是 Message(訊息) 資訊

裡面有一個Body欄位可取得 傳送的二進制流資料


Demo

為了簡單演示範例 我讓使用者輸入一個數字來跑迴圈,Producer 會把數字傳給Exchange並平均分配給所有consumer

本次有兩個 consumer 等待接收資訊,我們可以看到Fanout交換器不用指定RouteKey且把訊息平均分配到consumer上

此文作者:Daniel Shih(石頭)
此文地址https://isdaniel.github.io/rabbitmq-fanout/
版權聲明:本博客所有文章除特別聲明外,均採用 CC BY-NC-SA 3.0 TW 許可協議。轉載請註明出處!


如果本文對您幫助很大,可街口支付斗內鼓勵石頭^^