In this post we are going to write a simple load balancer in Haskell. The design is based on that presented in Rob Pike's Concurrency Is Not Parallelism talk (starting around 22 minutes). If you are not familiar with this presentation I highly recommend watching it before reading on. Pike presents an interesting load balancer design and its implementation to show off Go's built-in concurrency primitives. Just like last year, I am interested how this example would look in Haskell, a purely functional programming language.
I am presenting some code fragments of the implementation omitting some less-important, plumbing details. You can find the complete executable code in this repository.
A load balancer distributes workload across multiple computing units. Load balancing aims to optimize resource use, maximize throughput, minimize response time, and avoid overload of any single resource. I will call our "computing unit" simply Worker.
I have reproduced here the outline of the system we are going to implement:
To requester <──┐ ┌───────────┐ │ │ Requester │────┐ ┌──────────┐ ┌────────┐ │ └───────────┘ │ │ │─────> │ Worker │──┘ │ │ │ └────────┘ ┌───────────┐ └─> │ │ ┌────────┐ │ Requester │──────> │ Balancer │─────> │ Worker │────┐ └───────────┘ ┌─> │ │ └────────┘ │ │ │ │ ┌────────┐ │ ┌───────────┐ │ │ │─────> │ Worker │──┐ │ │ Requester │────┘ └──────────┘ └────────┘ │ │ └───────────┘ │ │ To requester <──┘ │ <────┘
The design comprises three components:
- Requester: Sends a unit of work to the system and waits for the result.
- Balancer: Receives requests and distributes them among the available workers.
- Worker: Executes the requested the work and sends back the result to the Requester.
The components communicate through channels. I shall use unbounded FIFO channels from the stm library
The interesting part about this design is that the results from the workers do not cross the Balancer. The worker directly sends back the result to the requester.
First, let's define the data structure to hold the requests. This is a type of a request that, when executed, yields a value of type
data Request a = Request (IO a) (TChan a)
Request holds an
IO action and a result channel: the action is the executable task at hand, the result channel transmits the result back to the requester. Both the action and the channel are parametrized over the same type: the result of the task must "fit" in the result channel.
The task and the Requester
Let's start imagining how the clients would interact with the load balancer. We generate some work with
randomTask, a function with the signature:
randomTask :: IO Int
Neither the load balancer nor the worker have to know about the internals of this function. In my implementation
randomTask draws a number from a uniform distribution, it sleeps that amount of seconds and returns the number as a result. In other words: the result of
randomTask is the time it took to complete it.
requester function represent the clients who have some work to perform:
requester :: TChan (Request Int) ^ -- input channel of the load balancer -> IO () requester balancer = forever $ do -- simulating random load delayMs <- getStdRandom $ randomR (1, 1000) threadDelay (delayMs * 1000) -- send the request resultChan <- newTChanIO atomically $ writeTChan balancer (Request randomTask resultChan) -- wait for the result async $ atomically $ readTChan resultChan
This function is an infinite loop where each iteration sends a request to the load balancer after a random delay. This is a very primitive way to simulate some load: not realistic, but good enough for now. The requester creates the result channel, packages it up along with the task, sends the request, and waits for the results asynchronously. The next iteration will not be blocked.
Note that the interface to the load balancer is a simple value. The
requester only drops this value in a channel and waits. It does not care about what happens to the
Request on the other end of the channel.
As shown in the design, the
Balancer receives the
Request from a channel and distributes them among the available workers.
The Balancer holds a pool of workers and a completion channel:
data Balancer a = Balancer (Pool a) (TChan (Worker a)) -- Pool and Worker will be defined later
Workers use the completion channel to report to the
Balancer the completion of a task. The
Balancer uses this information to keep track of the load of each worker.
balance :: TChan (Request a) -- ^ input channel to receive work from -> Balancer a -- ^ Balancer -> IO () balance requestChan (Balancer workers doneChannel) = race_ -- ❶ runWorkers (runBalancer workers) where runWorkers = mapConcurrently (`work` doneChannel) (map snd $ toList workers) -- ❷ runBalancer pool = do msg <- -- ❸ atomically $ (WorkerDone <$> readTChan doneChannel) `orElse` (RequestReceived <$> readTChan requestChan) newPool <- case msg of -- ❹ RequestReceived request -> dispatch pool request WorkerDone worker -> return $ completed pool worker runBalancer newPool -- ❺
As expected, this is the most complex part of the system. Let's walk through it step by step:
balancefunction runs the workers and the balancer itself asynchronously. The
race_combinator is from the async package and I talked about it in a previous post. It runs its two arguments concurrently and it terminates if any of those two terminate.
runWorkersconcurrently executes the workers. The function
work :: Worker a -> TChan (Worker a) -> IO ()is part of the Worker API and makes the worker process requests in an infinite loop. I shall present the details of the
Workerin the next section.
The balancer waits for new messages on the request and completion channels.
Depending on the received message
completed. These two functions implement the load balancing strategy and update the state of the worker pool.
runBalancerrecursively calls itself with the updated worker pool.
RequestReceived and the
WorkerDone are data constructors of an internal type
data ControlMessage a = RequestReceived (Request a) | WorkerDone (Worker a)
This type "tags" the incoming message so we can differentiate from where it was originated.
The communication channels of the
Balancer are now wired up; we are ready to implement the load balancing strategy. Following the the original design, I shall implement the worker pool using a heap from the Data.Heap module.
type Pool a = DH.MinPrioHeap Int (Worker a)
This heap stores priority-worker pairs
(Int, Worker a). The priority represents the number of tasks assigned to the worker. Because we are using
MinPrioHeap the pair with minimal priority, that corresponding to the least loaded worker, is extracted first.
The two functions
complete, introduced in the definition of
balance, manipulate the heap of the worker pool. The function
dispatch selects the least loaded worker and schedules the request on it.
dispatch :: Pool a -> Request a -> IO (Pool a) dispatch pool request = do let ((p, w), pool') = fromJust $ DH.view pool -- ❶ schedule w request -- ❷ return $ DH.insert (p + 1, w) pool' -- ❸
The interface of the heap module is unusual, but the code is only three lines:
- The function view extracts the worker with the minimum priority, that is with the lowest number of tasks.
- The function
schedule :: Worker a -> Request a -> IO ()is part of the Worker API and it sends the given request to the extracted worker.
- The function insert puts back the worker into the pool with an increased priority. The return value is the updated pool.
completed function is very similar to
completed :: Pool a -> Worker a -> Pool a completed pool worker = let (p', pool') = DH.partition (\item -> snd item == worker) pool [(p, w)] = toList p' in DH.insert (p - 1, w) pool'
First, we look up the given worker using partition. Second, we put it back into the heap with a decreased priority value.
The last component of the system is the worker itself. The worker comprises an identifier, an integer in this case, and a channel from which it can receive requests:
data Worker a = Worker Int (TChan (Request a)) deriving Eq
This is the worker's main function:
work :: Worker a -> TChan (Worker a) -> IO () work (Worker workerId requestChan) doneChannel = forever $ do Request task resultChan <- atomically $ readTChan requestChan -- ❶ result <- task -- ❷ atomically $ do writeTChan resultChan result -- ❸ writeTChan doneChannel (Worker workerId requestChan) -- ❹
It is an infinite loop which (1) reads from the request channel, (2) performs the task, (3) sends the result to the requester, and (4) reports the completion to the balancer. We have already seen the implementations of other ends of these two channels.
For better encapsulation I provide a short helper function to send a request to the worker:
schedule :: Worker a -> Request a -> IO () schedule (Worker i c) request = atomically $ writeTChan c request
schedule deconstructs the
Worker and sends the provided request to its internal channel. This function allows
Worker to remain opaque and the request channel is not exposed outside the worker's module.
Putting it all together
In the previous sections we have seen all the components: the requester, the balancer, and the workers. We only have to wire everything up:
start :: IO () start = do chan <- newTChanIO -- create a channel balancer <- newBalancer chan 3 -- create a `Balancer` with 3 workers -- run the balancer and the requester concurrently race_ (balance chan balancer) (requester chan)
If you run the complete code you will get an output like:
Balancer: fromList [(0,Worker 2),(1,Worker 3),(0,Worker 1)] Balancer: fromList [(0,Worker 1),(1,Worker 2),(1,Worker 3)] Balancer: fromList [(1,Worker 3),(1,Worker 1),(1,Worker 2)]
You can see that the first three requests were scheduled on different workers. The program continues indefinitely and you should see roughly the same amount of task on each worker.
I presented the Haskell implementation of an interesting load balancer design based on the Go code by Rob Pike. The load balancer gives the incoming request to the least-loaded worker. The components are loosely coupled and they communicate via channels.
Can we deploy this to production? Certainly not. The design seems to scale up to multiple workers and high request rates, but how can we prove this? What kinds of tests could we write for such a concurrent system? Can we test that the balancing strategy is sound? I will try to find answers to these questions in a future post.
The full code can be found in this repository.