🚫 Ad Blocker Detected

Please disable your AD blocker to continue using this site. Ads help us keep the content free! please press keyboard F5 to refresh page after disabled AD blocker

請關閉廣告攔截器以繼續使用本網站。廣告有助於我們保證內容免費。謝謝! 關閉後請按 F5 刷新頁面

0%

RabbitMQ (三) 簡單實作一個MQ

前言

本篇利用RabbitMQ client來簡單實現MQ功能.

在RabbitMQ中有很重要兩個角色,ProducerConsumer,下面這個範例使用c# console來實現.

我個人覺得APMQ協議和Http協議有點類似,一樣有Header(Properties),body…等等概念.

Producer Code

一開始我們宣告一個 ConnectionFactory 並設置RabbitMQ Server連接參數

  • UserName:使用者帳號
  • Password:使用者密碼
  • HostName:連接FQDN或IP

RabbitMQ預設密碼是 guest

//建立連接工廠
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

呼叫factory.CreateConnection 建立連接RabbitMQ連接物件,並呼叫 CreateModel方法建立一個channel Model

在之前有說過RabbitMq會利用同一個Connection來建立不同的channel來執行MQ連接.

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    #region 如果在RabbitMq手動建立可以忽略這段程式
    //建立一個Queue
    channel.QueueDeclare(queueName, false, false, false, null);
    //建立一個Exchange
    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
    //把Queue跟Exchange
    channel.QueueBind(queueName, exchangeName, routeKey); 
    #endregion

    Console.WriteLine("\nRabbitMQ連接成功,如需離開請按下Escape鍵");

    string input = string.Empty;
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發布訊息到RabbitMQ Server
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (Console.ReadKey().Key != ConsoleKey.Escape);
}

最後在使用 channel.BasicPublish 方法 將訊息推送給指定交換器,因為是走tcp所以將訊息轉換成二進制流


Consumer Code

前面建立連接都大同小異都是利用ConnectionFactory來建立連接物件

ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

string exchangeName = "exchangeFanout";
string queueName = "FanoutQueue";
string routeKey = string.Empty;

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    //channel.QueueBind
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    channel.BasicQos(0, 1, false);
    //接收到消息事件 consumer.IsRunning
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body);

        Console.WriteLine($"Queue:{queueName}收到資料: {message}");
        channel.BasicAck(ea.DeliveryTag, false);
    };

    channel.BasicConsume(queueName, false, consumer); 
    Console.WriteLine("接收訊息");
    Console.ReadKey();
}

值得一提的是 EventingBasicConsumer 這個類別有一個建構子函數,把 channel 物件傳入產生一個消費者

EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

在呼叫 EventingBasicConsumer.Received綁定接收訊息事件,

  1. 第一個參數是channel物件本身
  2. 第二個參數是 Message (訊息) 資訊

裡面有一個Body欄位可取得 傳送的二進制流資料


Demo

為了簡單演示範例 我讓使用者輸入一個數字來跑迴圈,Producer 會把數字傳給Exchange並平均分配給所有consumer

本次有兩個 consumer 等待接收資訊,我們可以看到Fanout交換器不用指定RouteKey且把訊息平均分配到consumer上

__此文作者__:Daniel Shih(石頭)
__此文地址__: https://isdaniel.github.io/rabbitmq-fanout/
__版權聲明__:本博客所有文章除特別聲明外,均採用 CC BY-NC-SA 3.0 TW 許可協議。轉載請註明出處!

如果本文對您幫助很大,可街口支付斗內鼓勵石頭^^