Summary: I argue that using a library is the best design pattern. The Haskell ecosystem offers powerful tools for writing concurrent programs.
In a previous post I demonstrated some of Haskell's features to write concurrent programs. I developed a simulated search engine which looked like this:
search30 :: SearchQuery -> IO () search30 query = do req <- timeout maxDelay $ mapConcurrently (fastest query) [Web, Image, Video] printResults req
I argued that this was a fast, concurrent, replicated and robust version of the previous iterations. In the following sections I am going to improve this example, therefore I suggest reading the related post to understand how we got here.
The faster wins
In the heart of the
search30 function we find this helper function:
fastest :: SearchQuery -> SearchKind -> IO String fastest query kind = do req <- race (fakeSearch query kind) -- server 1 (fakeSearch query kind) -- server 2 return $ case req of Left r -> "Server1: " ++ r Right r -> "Server2: " ++ r
fastest sends the search query to two replicas of the search backend and
keeps the result from the faster responding server. The
race function from
the async library runs two actions concurrently and returns the result
of the faster. The slower action is terminated.
In real code you would replace the
fakeSearch function with an appropriate
search API call but the structure of the function would not change
For the sake of this post we explicitly prepend the name of the server from which the result came. In a real application this would not be displayed to the user. However, we could still store some information about the response's origin for further analysis.
This implementation of
fastest works well, but only in the special case of
two servers. What if we want to use more replicas? Let's see how we can add
support for any number of back-end servers.
More general race
Instead of racing two processes, we start
N instances of the
action concurrently and receive the result of the fastest. This is a recurring
pattern in concurrent programming: start many computations and signal if any
of them completed execution. In many programming languages, for example
Python and C#, you find library functions to solve
this problem. The naming and the implementation slightly changes from one
language to another, but the underlying concept is the same.
race function served us well, so let's keep digging in the async
library for something that can help us out again. It doesn't take too long to
locate these two functions:
waitAny :: [Async a] -> IO (Async a, a) -- Wait for any of the supplied Asyncs to complete. waitAnyCancel :: [Async a] -> IO (Async a, a) -- Like waitAny, but also cancels the other asynchronous operations as soon as one has completed.
This looks promising: we can use
waitAnyCancel to achieve the same behavior
race but for a list of asynchronous operations.
We cannot just replace
waitAnyCancel because the type signatures
of the two functions are different. For reference, this is
race :: IO a -> IO b -> IO (Either a b)
We need to think a bit more how to fit
The central type of the async library,
Async a, represents an asynchronous
operation that yields a value of type
a. Asynchronous operations can be
spawned using the
async :: IO a -> IO (Async a)
The type of the expression
fakesearch query kind is
IO String. Feeding
async (fakesearch query kind), we get an
IO (Async String)
which launches the search asynchronously in a separate thread. We could
N times and use
waitAnyCancel to select the fastest query.
However, if we went down this path we would be unable to identify which server
replica gave us the fastest answer. We would only get back the search
result, but not the winner replica's identity.
When we used
race it was easy to identify the faster replica because we used
pattern matching on the returned
waitAnyCancel does not give
us any hint on which action in the provided list was the fastest.
IO a is a functor, we can use
fmap (usually abbreviated as
<$>) to apply a function on the result of the search operation. Let's write
a function which prepends the server's identity to the search result:
-- prepend the server's identity to the search result servedBy i result = "Server " ++ (show i) ++ ": " ++ result
For example, in case of the second server replica the transformed search action
servedBy 2 <$> fakeSearch query kind. The type of this
expression is still
IO String but it yields the search result with the server
Let's go over the series of transformation steps and see how the expressions and their types were modified:
-- the search API fakesearch :: SearchQuery -> SearyKind -> IO String -- fill in the arguments to get IO action that yields a string fakesearch query kind :: IO String -- prepend second replica's identity to the search result servedBy 2 <$> fakesearch query kind :: IO String -- a function which prepends the provided replica number to the search result \i -> servedBy i <$> fakesearch query kind :: Int -> IO String -- same as before but as an asyncronous action \i -> async (servedBy i <$> fakesearch query kind) :: Int -> IO (Async String)
In the last two lambda-expressions I kept the replica number as a free parameter, a form we can reuse in the final step.
Let's assemble the new
fastest implementation from the pieces we have:
fastest :: SearchQuery -> SearchKind -> IO String fastest query kind = do requests <- forM [1..numReplicas] $ \i -> -- ① async (servedBy i <$> fakeSearch query kind) -- <$> is `fmap` -- ② (_, result) <- waitAnyCancel requests -- ③ return result -- ④ where numReplicas = 3 servedBy i result = "Server " ++ show i ++ ": " ++ result
forM to transform a list of integers, the replica identifiers, by
applying the provided function.
requests has a type
② The second argument of
forM is the transformation function which creates
an asynchronous operation yielding the search result with the replica's
waitAnyCancel and extract the result from the fastest search
operation (we don't use first element of the returned tuple)
④ Provide result of the current IO operation
This implementation, only six lines of code, works with any number of
numReplicas=2 its behavior is identical to that of the old
We replaced the original implementation of the
fastest function, only capable
of supporting two server replicas, into a more general one which works with any
number of replicas.
The implementation relies on a single library function
waitAnyCancel but we
needed to arrange the asynchronous search operations in a way that is
waitAnyCancel's type signature. We used generic combinators
forM to achieve this. The
search30 requires no
modifications, as the type signature of
fastest didn't change.
We wrote concurrent code with no locks, no mutexes and no concurrent design
patterns to remember. We manipulate asynchronous
computations as values and call library functions. This is possible because
the async library exposes generic and composable primitives and hides
the complexity of thread management. Also the language provides powerful
combinators such as
fmap for implementing our programs.
You can find the examples given here as executable code on GitHub.