Message Queues are one of the best ways to handle asynchronous application processes.
A little about message queues: In a message queue, an application called a producer sends messages to a queue, the queue has a consumer attached to it which processes tasks accordingly. A traditional message queue has a single consumer attached to it as a message is only delivered to one consumer at a time, however in some scenarios, you might need to set up multiple consumers to handle the incoming messages concurrently. To achieve this, you can combine the message queue with a publish/subscribe (pub/sub) messaging pattern using a fanout design pattern.
What is a pub/sub messaging pattern? —It is a messaging pattern where the producers (publishers) send messages to a topic or channel, and multiple consumers (subscribers) can receive those messages from the same topic simultaneously.
In this article, I will demonstrate how to set up a message queue with a single consumer attached to it, but the consumer will be spinning up multiple worker pools to handle tasks concurrently.
First, let’s set up the message queue.
Initialize your go project and create a new file called producer.go
This will be the code that sends messages to our queue.
On running the code you should see the following logs
% go run producer.go
Published message 1
Published message 2
Published message 3
Published message 4
Published message 5
Published message 6
Published message 7
Published message 8
Published message 9
Published message 10
done publishing messages
Now let’s create a consumer that consumes messages from the queue, let’s only log the messages for now.
On running this code the following output should be displayed:
% go run consumer.go
Successfully connected to RabbitMQ instance
Waiting for messages...
Received Message: message: 1
Received Message: message: 2
Received Message: message: 3
Received Message: message: 4
Received Message: message: 5
Received Message: message: 6
Received Message: message: 7
Received Message: message: 8
Received Message: message: 9
Received Message: message: 10
As you can see in the consumer we only have a single go routine doing the processing, but what if we can set up multiple go routines(let’s refer to them as workers) so that each goroutine(worker) can process messages concurrently?
Let’s modify the consumer like this
With this new approach, the output of the consumer will look like this:
% go run consumer.go
Successfully connected to RabbitMQ instance
2023/08/07 12:49:31 Worker [4] running
2023/08/07 12:49:31 Worker [1] running
2023/08/07 12:49:31 Worker [0] running
2023/08/07 12:49:31 Worker [2] running
2023/08/07 12:49:31 Worker [3] running
2023/08/07 12:49:34 Worker [1] processing task: (message: 2)
2023/08/07 12:49:34 Worker [4] processing task: (message: 1)
2023/08/07 12:49:34 Worker [2] processing task: (message: 4)
2023/08/07 12:49:34 Worker [0] processing task: (message: 3)
2023/08/07 12:49:34 Worker [1] processing task: (message: 6)
2023/08/07 12:49:34 Worker [3] processing task: (message: 5)
2023/08/07 12:49:34 Worker [2] processing task: (message: 8)
2023/08/07 12:49:34 Worker [4] processing task: (message: 7)
2023/08/07 12:49:34 Worker [0] processing task: (message: 9)
2023/08/07 12:49:34 Worker [1] processing task: (message: 10)
See how each worker started running and waiting for tasks in the task channel, when the producer sends 10 tasks each of the 5 workers picks up a task and processes it then moves on to the next available task.
So what is going on in the code? We first set up a worker struct to hold our worker’s IDs and its task in a channel, then aWaitGroup
to keep track of the active worker. Next, we added a function to the struct called run()
inside this run function, we have a for-range
loop that handles the task.
In the main function of this modified code, after setting up the queue we declared a task channel to hold incoming tasks and a TOTAL_WORKERS
variable, this variable will be the maximum number of workers(goroutines) that will be started when the application runs. As mentioned earlier each worker will handle separate tasks concurrently. After this, we start each worker in a for loop, I set up just 5 workers here, you might need more than that in a high-traffic application (but be mindful or resources), before starting each worker we increment the waitgroup
by 1 so that our main function will wait for all workers to finish before exiting, next we read the messages coming into the queue with a for-range
loop, and inside it, we pass the task into the task channel.
Note that the capacity of the task channel is set to 10 (which means only 10 tasks can be buffered in the channel before it starts to block), this way the producer will have to wait for a channel to open before being allowed to send tasks, this is to avoid excessive memory usage, especially if the tasks are memory-intensive.
Here are some of the major benefits of using this worker approach to handling tasks in a queue:
Concurrency: Worker pools allow concurrent processing of tasks, which can significantly improve the overall throughput and performance of the application, especially when dealing with a large number of long-running tasks.
Scalability: As the workload increases, you can easily scale the system by increasing the number of workers in the pool, allowing it to handle more tasks concurrently.
While this approach is a good idea, it does have some potential issues to consider:
Message Ordering: Worker pools do not guarantee the order of task processing. If the order of processing is critical, additional measures need to be taken (e.g., a timestamp in the message) to maintain the desired order.
Potential Bottlenecks: If the processing of tasks is significantly slower than message consumption, the queue might back up, leading to higher message latency and potential message loss if the queue fills up.
In conclusion, the worker pool pattern is a valuable tool for concurrent task handling in many scenarios. With a clear understanding of its benefits and considerations, developers can effectively design and implement efficient task-processing systems to meet their application requirements.