0%

MessageWorkerPool framework introduction

簡介

最近我開發了 MessageWorkerPool 專案。其主要概念是提供一個平台框架,使使用者能夠快速且輕鬆地在 Worker 內實作邏輯。該設計高度靈活,允許基於我創建的 Worker 通訊協議,以多種程式語言實作 Worker。目前,我已提供使用 C#、Rust 和 Python 編寫的 Worker 範例。

這個函式庫在多進程環境中處理任務表現優異。此外,它還支援優雅關閉 (graceful shutdown),確保在隨時 consumer worker 能順利終止處理程序。

MessageWorkerPool GitHub

為什麼選擇 ProcessPool 而非 ThreadPool?

當你需要強大的隔離性,以防止某個任務影響其他任務時,應該選擇 ProcessPool,特別是針對關鍵操作或容易崩潰的任務。雖然 ThreadPool 較為輕量(因為執行緒共用記憶體並且具有較低的上下文切換開銷),但 ProcessPool 能夠提供更靈活的解決方案,允許使用不同的程式語言來實作 Worker。

安裝

要安裝 MessageWorkerPool 套件,請使用以下 NuGet 指令:

1
PM > Install-Package MessageWorkerPool

若要手動安裝此函式庫,可克隆儲存庫並建置專案:

1
2
3
git clone https://github.com/isdaniel/MessageWorkerPool.git
cd MessageWorkerPool
dotnet build

架構概覽

快速開始

這是部署 RabbitMQ 和相關服務的快速開始指南,使用提供的 docker-compose.yml 檔案和 .env 中的環境變數。

1
docker-compose --env-file .\env\.env up --build -d
  1. 檢查 RabbitMQ 健康狀態:在瀏覽器中開啟 http://localhost:8888 以訪問 RabbitMQ 管理面板。
  • 使用者名稱: guest
  • 密碼: guest
  1. 檢查 OrleansDashboard http://localhost:8899
  • 使用者名稱: admin
  • 密碼: test.123

程式結構

以下是創建並配置與 RabbitMQ 互動的 workerpool 的範例程式碼。以下是其功能的解析:workerpool 將根據您的 RabbitMqSetting 設定從 RabbitMQ 伺服器獲取訊息,並通過 Process.StandardInput 將訊息傳遞給用戶創建的真實 worker node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Program
{
public static async Task Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole(options => {
options.FormatterName = ConsoleFormatterNames.Simple;
});
logging.Services.Configure<SimpleConsoleFormatterOptions>(options => {
options.IncludeScopes = true;
options.TimestampFormat = " yyyy-MM-dd HH:mm:ss ";
});
}).AddRabbitMqWorkerPool(new RabbitMqSetting
{
UserName = Environment.GetEnvironmentVariable("USERNAME") ?? "guest",
Password = Environment.GetEnvironmentVariable("PASSWORD") ?? "guest",
HostName = Environment.GetEnvironmentVariable("RABBITMQ_HOSTNAME"),
Port = ushort.TryParse(Environment.GetEnvironmentVariable("RABBITMQ_PORT"), out ushort p) ? p : (ushort) 5672,
PrefetchTaskCount = 3
}, new WorkerPoolSetting() { WorkerUnitCount = 9, CommandLine = "dotnet", Arguments = @"./ProcessBin/WorkerProcessSample.dll", QueueName = Environment.GetEnvironmentVariable("QUEUENAME"), }
);

}

worker process 與 workerPool 之間的協議

worker node 與任務進程之間的協議使用 MessagePack 二進制格式來進行更快且更小的資料傳輸,標準輸入將發送信號來控制 worker process。

一開始 workerPool 將通過標準輸入傳遞 NamedPipe 名稱,因此 worker node 需要接收該名稱並建立 worker process 和 workerPool 之間的 NamedPipe。

workerPool 發送的操作指令

目前,workerPool將通過標準輸入向 worker process 發送操作信號或指令。

  • CLOSED_SIGNAL (__quit__): 代表 workerPool 發送關閉或關機信號給 worker node,worker process 應盡快執行優雅關機。
    通過 (Data Named Pipe Stream) 進行資料傳輸
    命名管道是一種強大的進程間通信 (IPC) 機制,它允許兩個或更多的進程之間進行通信,即使它們運行在不同的機器上(例如 Windows 等支持的平台)。我們的 worker 使用此方式在 worker node 與 workerPool 之間傳輸資料。

msgpack 協議支持的資料類型如下類別與 byte[] 格式。

對應的 byte[] 資料是:

1
2
[132,161,48,179,78,101,119,32,79,117,116,80,117,116,32,77,101,115,115,97,103,101,33,161,49,204,200,161,50,129,164,116,101,115,116,167,116,101,115,116,118,97,108,161,51,169,116,101,115,116,81,117,101,117,101]

要將提供的偽 JSON 結構表示為 MsgPack 格式(byte[]),我們可以分解過程如下:

1
2
3
4
5
6
7
8
9
Edit
{
"0": "New OutPut Message!",
"1": 200,
"2": {
"test": "testval"
},
"3": "testQueue"
}

更多資訊,您可以使用 msgpack-converter 來解碼和編碼。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 /// <summary>
/// 封裝來自 MQ 服務的訊息
/// </summary>
[MessagePackObject]
public class MessageOutputTask
{
/// <summary>
/// 來自進程的輸出訊息
/// </summary>
[Key("0")]
public string Message { get; set; }
[Key("1")]
public MessageStatus Status { get; set; }
/// <summary>
/// 我們希望儲存的回應資訊以便繼續執行訊息。
/// </summary>
[Key("2")]
[MessagePackFormatter(typeof(PrimitiveObjectResolver))]
public IDictionary<string, object> Headers { get; set; }
/// <summary>
/// 預設使用 BasicProperties.Reply To 隊列名稱,任務處理器可以覆寫回應隊列名稱。
/// </summary>
/// <value>預設使用 BasicProperties.Reply</value>
[Key("3")]
public string ReplyQueueName { get; set; }
}

我將在此介紹 MessageStatus 的含義。

  • IGNORE_MESSAGE (-1) : 將訊息附加到資料流管道中,而不進行進一步處理。
    • Status = -1: 任務處理告訴 worker process 這不是回應或確認訊息,只是回饋到資料流管道。
  • MESSAGE_DONE (200) : 通知 worker process 該案件可以由訊息隊列服務進行確認。
    • Status = 200 任務處理告訴 worker process 該任務已完成並且可以確認。
  • MESSAGE_DONE_WITH_REPLY (201) : 請確保我們滿足以下步驟以支援 RPC。
    • 客戶端代碼必須提供 ReplyTo 資訊。
    • 任務處理將使用 JSON 負載中的 Message 欄位來回應隊列資訊。
    • 例如:當 Status = 201 透過資料流管道發送時,任務處理指示 worker process 輸出,例如 1010,該數據必須然後發送到回應隊列。

我們可以通過不同的程式語言來編寫自己的 worker node (我已經在此 github 提供了 Python, .NET, rust example code)。

如何處理長時間運行的任務或涉及處理大量數據行的任務?

類似於操作系統中的進程,發生上下文切換(中斷等)。

客戶端可以通過 Header 發送一個 TimeoutMilliseconds 值:在取消之前等待的時間(毫秒)。如果任務執行超過該值,worker process 可以使用該值來設置中斷,例如 CancellationToken。

例如,MessageOutputTask 的 JSON 可以如下所示,status=201 代表此訊息將重新入隊以便下次處理,並且訊息將攜帶 Headers 資訊再次重新入隊。

1
2
3
4
5
6
7
8
9
10
11
{
"Message": "This is Mock Json Data",
"Status": 201,
"Headers": {
"CreateTimestamp": "2025-01-01T14:35:00Z",
"PreviousProcessingTimestamp": "2025-01-01T14:40:00Z",
"Source": "OrderProcessingService",
"PreviousExecutedRows": "123",
"RequeueTimes": "3"
}
}

此專案還包括 integration、unit test 和 github action pipeline。雖然 API 文件(專案仍在 beta 階段),但我計劃在未來逐步添加。如果您對此專案有任何想法或建議,請隨時創建問題或發送 PR。

__此文作者__:Daniel Shih(石頭)
__此文地址__: https://isdaniel.github.io/mq-workerpool-introduction/
__版權聲明__:本博客所有文章除特別聲明外,均採用 CC BY-NC-SA 3.0 TW 許可協議。轉載請註明出處!

如果本文對您幫助很大,可街口支付斗內鼓勵石頭^^