Concurrency without magic

Summary: I argue that using a library is the best design pattern. The Haskell ecosystem offers powerful tools for writing concurrent programs.

In a previous post I demonstrated some of Haskell's features to write concurrent programs. I developed a simulated search engine which looked like this:

search30 :: SearchQuery -> IO ()
search30 query = do
    req <- timeout maxDelay $
        mapConcurrently (fastest query) [Web, Image, Video]
    printResults req

I argued that this was a fast, concurrent, replicated and robust version of the previous iterations. In the following sections I am going to improve this example, therefore I suggest reading the related post to understand how we got here.

The faster wins

In the heart of the search30 function we find this helper function:

fastest :: SearchQuery -> SearchKind ->  IO String
fastest query kind = do
    req <- race (fakeSearch query kind) -- server 1
                (fakeSearch query kind) -- server 2

    return $ case req of
        Left  r -> "Server1: " ++ r
        Right r -> "Server2: " ++ r

fastest sends the search query to two replicas of the search backend and keeps the result from the faster responding server. The race function from the async library runs two actions concurrently and returns the result of the faster. The slower action is terminated.

In real code you would replace the fakeSearch function with an appropriate search API call but the structure of the function would not change significantly.

For the sake of this post we explicitly prepend the name of the server from which the result came. In a real application this would not be displayed to the user. However, we could still store some information about the response's origin for further analysis.

This implementation of fastest works well, but only in the special case of two servers. What if we want to use more replicas? Let's see how we can add support for any number of back-end servers.

More general race

Instead of racing two processes, we start N instances of the fakeSearch action concurrently and receive the result of the fastest. This is a recurring pattern in concurrent programming: start many computations and signal if any of them completed execution. In many programming languages, for example Python and C#, you find library functions to solve this problem. The naming and the implementation slightly changes from one language to another, but the underlying concept is the same.

The race function served us well, so let's keep digging in the async library for something that can help us out again. It doesn't take too long to locate these two functions:

waitAny :: [Async a] -> IO (Async a, a)
-- Wait for any of the supplied Asyncs to complete.

waitAnyCancel :: [Async a] -> IO (Async a, a)
-- Like waitAny, but also cancels the other asynchronous operations as soon as one has completed.

This looks promising: we can use waitAnyCancel to achieve the same behavior as race but for a list of asynchronous operations.

We cannot just replace race with waitAnyCancel because the type signatures of the two functions are different. For reference, this is race:

race :: IO a -> IO b -> IO (Either a b)

We need to think a bit more how to fit waitAnyCancel into fastest.

Building blocks

The central type of the async library, Async a, represents an asynchronous operation that yields a value of type a. Asynchronous operations can be spawned using the async function:

async :: IO a -> IO (Async a)

The type of the expression fakesearch query kind is IO String. Feeding this to async, async (fakesearch query kind), we get an IO (Async String) which launches the search asynchronously in a separate thread. We could replicate this N times and use waitAnyCancel to select the fastest query. However, if we went down this path we would be unable to identify which server replica gave us the fastest answer. We would only get back the search result, but not the winner replica's identity.

When we used race it was easy to identify the faster replica because we used pattern matching on the returned Either value. waitAnyCancel does not give us any hint on which action in the provided list was the fastest.

Now, because IO a is a functor, we can use fmap (usually abbreviated as <$>) to apply a function on the result of the search operation. Let's write a function which prepends the server's identity to the search result:

-- prepend the server's identity to the search result
servedBy i result = "Server " ++ (show i) ++ ": " ++ result

For example, in case of the second server replica the transformed search action would read: servedBy 2 <$> fakeSearch query kind. The type of this expression is still IO String but it yields the search result with the server name prepended.

Let's go over the series of transformation steps and see how the expressions and their types were modified:

-- the search API
fakesearch :: SearchQuery -> SearyKind -> IO String

-- fill in the arguments to get IO action that yields a string
fakesearch query kind :: IO String

-- prepend second replica's identity to the search result
servedBy 2 <$> fakesearch query kind :: IO String

-- a function which prepends the provided replica number to the search result
\i -> servedBy i <$> fakesearch query kind :: Int -> IO String

-- same as before but as an asyncronous action
\i -> async (servedBy i <$> fakesearch query kind) :: Int -> IO (Async String)

In the last two lambda-expressions I kept the replica number as a free parameter, a form we can reuse in the final step.

Final implementation

Let's assemble the new fastest implementation from the pieces we have:

fastest :: SearchQuery -> SearchKind ->  IO String
fastest query kind = do
    requests <- forM [1..numReplicas] $ \i ->                           -- ①
        async (servedBy i <$> fakeSearch query kind)  -- <$> is `fmap`  -- ②
    (_, result) <- waitAnyCancel requests                               -- ③
    return result                                                       -- ④

  where numReplicas = 3
        servedBy i result = "Server " ++ show i ++ ": " ++ result

① Use forM to transform a list of integers, the replica identifiers, by applying the provided function. requests has a type [Async String], exactly what waitAnyCancel needs.

② The second argument of forM is the transformation function which creates an asynchronous operation yielding the search result with the replica's identity prepended

③ Call waitAnyCancel and extract the result from the fastest search operation (we don't use first element of the returned tuple)

④ Provide result of the current IO operation

This implementation, only six lines of code, works with any number of replicas. For numReplicas=2 its behavior is identical to that of the old one.

Summary

We replaced the original implementation of the fastest function, only capable of supporting two server replicas, into a more general one which works with any number of replicas.

The implementation relies on a single library function waitAnyCancel but we needed to arrange the asynchronous search operations in a way that is compatible with waitAnyCancel's type signature. We used generic combinators such as fmap and forM to achieve this. The search30 requires no modifications, as the type signature of fastest didn't change.

We wrote concurrent code with no locks, no mutexes and no concurrent design patterns to remember. We manipulate asynchronous computations as values and call library functions. This is possible because the async library exposes generic and composable primitives and hides the complexity of thread management. Also the language provides powerful combinators such as fmap for implementing our programs.

You can find the examples given here as executable code on GitHub.

The inspiration of this and the previous post came from the Go examples presented in Rob Pike's Concurrency is Not Parallelism talk.