MapReduce Part One

We introduce the MapReduce paradigm by building a single-node parallel data processing system.

In part one we begin by creating a simple MapReduce system that runs on only one node. In part two, we'll adjust the system to work with multiple nodes in a cluster. Finally, in part three, we'll add replication and automatic fail-over when nodes crash.

Complete code for this tutorial is available in the GitHub repository.

The MapReduce Paradigm

MapReduce is a way of performing batch data processing in parallel. The workload is split up among many workers who each perform a computation (the map() function) on their portion of the data. Each worker passes on its results which are aggregated by a reduce() function. The final result is a single data value that represents the computation across the entire data set (which usually implies across many computers, but in our simple case we're using many workers all running on a single machine). In the real world, you can process just about any kind of data using this technique, but we're going to stick to lists of integers, for simplicity.

Continuing with this theme of simplicity, we'll keep the architecture of the system very simple as well. The two main components will be a server module, called mrs (short for MapReduce Server), to coordinate activities and a worker module, called worker which will be responsible for storing integers and performing calculations on those lists of integers. The relationship between the server and the workers will be one-to-many; we'll start only one server which will then manage several workers.

The Worker

Let's begin with the code by looking at the worker.erl module, which contains all of the API and internal code for the worker processes (full source code available here), and is simpler than the server code.

The worker operates on a simple server loop containing a receive block. The only client API call required is a new() function to spawn a new worker process with an empty list of integers as its initial state:

The new() call also registers the new worker process with the mrs server, making the entire initialization process for a new worker automatic. When we look at the server, we'll see exactly what happens during working registration.

Inside the server loop for the worker, we have a receive block with several message types that represent different actions the worker can perform. The simplest is to reset the worker, clearing its data list:

Next, we may ask the worker to print out its current list of integers (data):

And to store a new integer in the list:

Note that we prepended the new integer, Int, to the head of the list: [Int|Numbers]. This is an important technique to use in Erlang, due to the way lists are implemented. Appending items to the tail of a list is a much more costly operation.

Finally, the worker must be able to accept a map function and apply that function to each item in its data list, then return the results to the server:

The Server

Now let's look at the server, which will act as the command and control mechanism for a small army of parallel workers. First, the client API which contains a few simple functions to instruct the server to perform some simple operations:

And the most important client API call: mapreduce(Map, Reduce), which takes as arguments a map() function and a reduce() function, commands the server to perform the MapReduce job, then awaits the result and returns it for display in the shell:

The server can be started by calling mrs:start():

The startup procedure is simple: begin with zero workers (an empty list) and spawn the server process, then register it so that we can access it by name when invoking API calls later on.

With this server API in place we're now ready to examine the server's internal server loop. It's very similar to the internal loop of the worker. The loop simply contains a receive block with various message patterns, dispatching commands and processing logic as required.

One of the first operations needed at runtime is registering worker processes. We print a helpful information message, invoke the (async) mrs:rebalance() API call to trigger data rebalancing across the workers, then prepend the new worker process ID to the list of workers and continue back into the loop.

The print operation lists some diagnostic information to the shell and then requests that each worker print its information as well:

To reset the system, we simply dispatch a command to each worker instructing it to clear out its list of integers:

When storing new integer values in the system, we need to chose which worker should hold the value. To do so, we use a simple ring hash to choose the index of the worker in the Workers list, the send the worker a message commanding it to store the value:

The final server action to look at is again the mapreduce operation. The first part of a MapReduce job is to have each worker run the map function:

We then wait for each worker to reply with results:

Next, we have to convert the replies from a dictionary structure into a list, for easier processing in the reduce phase:

We then run the reduce function, once only, to aggregate the mapresults into a single, final, result value:

And, finally, send the final result back to the client process who requested the mapreduce job for display:

A Note About Rebalancing

Since the MapReduce paradigm is all about calculations on a distributed dataset, it wouldn't make sense to have some workers handling small or empty sub-sets of the data. I mentioned above that the server invokes a rebalance operation every time a new worker is added. Why? Because this forces the system to shuffle data around between the workers in an effort to keep the dataset evenly distributed. When a server calls its own mrs:rebalance() function, it sends itself a {rebalance} message:

and here is how it handles that message:

It sends each of it's workers a command to rebalance the data. Each worker scans its stored list of integers and returns any integers that must be moved to a different worker. The server then takes the {purged_data, Items} responses and stores them back in the cluster, so that they (eventually) end up stored by a different worker process. Here's the code that the worker uses to fulfill the rebalance command:

MapReduce Algorithms

With the server and worker code complete, the final component we need to use the MapReduce system is a collection of algorithms to carry out meaningful computations over the data set. Each MapReduce algorithm we write will have the same format inside an API call, we'll define a map function, then define a reduce function, then invoke mrs:mapreduce(Map, Reduce) to run the computation.

For example, to count the number of integers stored in our system, our map function will output 1 regardless of the stored value, then our reduce function will count how many ones were emitted:

An equivalent reduce operation would be to take the sum of the ones emitted from the map phase.

An algorithm to sum the integers stored in the system is similar, except that the map phase emits each integer:

Computing the mean of the data set requires a more complicated reduce function:

You can see a few other complete algorithms in the compute.erl module.

Complete source code, scripts, and instructions for running the system are available in the GitHub repository.

Running The System

The source repository contains bootstrapping scripts to start the server, create a few workers, and store some integers. To start the system, run

> ./start_server.sh

Once running, we can view the data by calling mrs:print()., we can store a new number by calling mrs:store(999)., and we can run computations such as finding the sum by calling compute:sum().. Check out the compute.erl module and try writing some more algorithms of your own.

Exercises

Part Two: Distributing The System

In the next tutorial, we change the system to operate as a cluster across multiple nodes (i.e. physical machines).