Introduction
Recently, I developed the MessageWorkerPool
project. The main concept is to provide a platform framework that enables users to implement logic quickly and easily within workers. The design is highly flexible, allowing workers to be implemented in multiple languages based on a worker protocol I created. Currently, I’ve provided examples for workers written in C#, Rust, and Python.
This library excels in handling tasks within multi-process environments, especially for applications demanding high throughput and low latency. It also supports graceful shutdown, ensuring a smooth process termination without disrupting ongoing tasks.
Why Process Pool rather than Thread Pool?
Use a process pool when you need robust isolation to prevent issues in one task from affecting others, especially for critical or crash-prone operations,although thread pool would be more lightweight (as threads share memory and require less context-switching overhead), however Process Pool would provide more flexibility solution by implement different program language.
Installation
To install the MessageWorkerPool
package, use the following NuGet command:
1 | PM > Install-Package MessageWorkerPool |
To install the library, clone the repository and build the project:
1 | git clone https://github.com/isdaniel/MessageWorkerPool.git |
Architecture overview
Quick Start
Here’s a quick start guide for deploying your RabbitMQ and related services using the provided docker-compose.yml
file and environment variables from .env
.
1 | docker-compose --env-file .\env\.env up --build -d |
- Check RabbitMQ health status: Open
http://localhost:8888
in your browser to access the RabbitMQ Management Dashboard.
- Username: guest
- Password: guest
- Check OrleansDashboard
http://localhost:8899
- Username: admin
- Password: test.123
Program Structure
Here is the sample code for creating and configuring a worker pool that interacts with RabbitMQ. Below is a breakdown of its functionality; The worker pool will fetch message from RabbitMQ server depended on your RabbitMqSetting
setting and sending the message via Process.StandardInput
to real worker node that created by users.
1 | public class Program |
Scalability
- Scaling is achieved by increasing the
WorkerUnitCount
&PrefetchTaskCount
determined how many amount of fetching message from rabbitMQ at same time.
- Scaling is achieved by increasing the
Decoupling
- RabbitMQ acts as a message broker, decoupling the producers of messages from the consumers (workers). This makes it easier to manage workloads independently.
Configurable
- The
RabbitMqSetting
object provides flexibility to modify connection settings, queue names, and worker pool details without changing the code.
- The
Reusable Workers
- Worker processes are defined by the
CommandLine
andArguments
, making it easy to reuse or swap out the tasks performed by the workers.
- Worker processes are defined by the
Protocol between worker and task process
The Protocol between worker and task process are use MessagePack binary format with faster and smaller data transfer, standardInput will send signal control worker process.
In the beginning, worker pool will pass NamedPipe name through standardInput, therefore worker program would need to receive that name and establish NamedPipe between worker process and worker pool.
Operation command from worker pool
Currently, worker pool will send operations signal or command to worker process via standardInput.
- CLOSED_SIGNAL (
__quit__
): that represent worker pool sent a close or shutdown signal to worker process, that worker process should perform graceful shutdown as soon as possible.
Data transfer via (Data Named Pipe Stream)
Named pipes are a powerful interprocess communication (IPC) mechanism that allows two or more processes to communicate with each other, even if they are running on different machines in a network (on platforms that support it, like Windows), our workers used to this for trasfering data between worker process and worker pool
msgpack protocols data type support as below class & byte[]
format.
The corresponding byte[]
data is:
1 | [132,161,48,179,78,101,119,32,79,117,116,80,117,116,32,77,101,115,115,97,103,101,33,161,49,204,200,161,50,129,164,116,101,115,116,167,116,101,115,116,118,97,108,161,51,169,116,101,115,116,81,117,101,117,101] |
To represent the provided pseudo-JSON structure using the MsgPack
format (byte[]), we can break down the process as follows:
1 | { |
More information you can use msgpack-converter to decode and encode.
1 | /// <summary> |
1 | /// <summary> |
I would introduce MessageStatus
meaning here.
IGNORE_MESSAGE (-1) : Append the message to data steaming pipeline without further processing.
Status = -1
: task process tell worker this isn’t a response nor ack message, only feedback to data steaming pipeline.
MESSAGE_DONE (200) : Notify the worker that this case can be acknowledged by the message queue service.
Status = 200
task process tell worker the task can be acked that mean it was finished.
MESSAGE_DONE_WITH_REPLY (201) : Please ensure we satisfied below steps for supporting RPC.
- The client side cdoe must provide
ReplyTo
information. - task process will use the
Message
column in the JSON payload to reply with the queue information. - Here is an example: When
Status = 201
is sent via data steaming pipeline, the task process instructs the worker to output, such as1010
, which must then be sent to the reply queue.
- The client side cdoe must provide
Example byte[]
data
1 | [130, 161, 48, 179, 78, 101, 119, 32, 79, 117, 116, 80, 117, 116, 32, 77, 101, 115, 115, 97, 103, 101, 33, 161, 49, 204, 200] |
Correspondence JSON from byte[]
1 | { |
We can write our own worker by different program language (I have provided python and .net sample in this repository).
How do we handle long-running task or the task involves processing a lot of data rows?
the concept like OS processing thread occurs a context switch (interrupt ..etc).
Client can send a value TimeoutMilliseconds
via Header: The time span to wait before canceling this (milliseconds), if the task execute exceed the value work process could use that value for setting
interrupt like Cancellationtoken
.
For example the MessageOutputTask
JSON could look like below, status=201
represents that this message will be re-queued for processing next time, the message will bring the Headers
information when requeue again.
1 | { |
This project also includes integration tests, unit tests, and an automation pipeline. While the API documentation and related materials are not yet completed (the project is still in beta), I plan to gradually add these in the future. If you have any thoughts or suggestions about this project, please feel free to create issue or send a PR.
__此文作者__:Daniel Shih(石頭)
__此文地址__: https://isdaniel.github.io/mq-workerpool-introduction/
__版權聲明__:本博客所有文章除特別聲明外,均採用 CC BY-NC-SA 3.0 TW 許可協議。轉載請註明出處!