MapReduce Part Two

We expand the MapReduce system by converting it into a multi-node parallel data processing system.

In part one we began by creating a simple MapReduce system that runs on only one node. Here in part two, we'll adjust the system to work with multiple nodes in a cluster. Finally, in part three, we'll add replication and automatic fail-over when nodes crash.

Complete code for this tutorial is available in the GitHub repository.

Refresher: The MapReduce Paradigm

MapReduce is a way of performing batch data processing in parallel. The workload is split up among many workers who each perform a computation (the map() function) on their portion of the data. Each worker passes on its results which are aggregated by a reduce() function. The final result is a single data value that represents the computation across the entire data set. In the real world, you can process just about any kind of data using this technique, but we're going to stick to lists of integers, for simplicity.

Since we have already built a stand-alone MapReduce system, it's time to extend it into something that will operate as a cluster of identical nodes, or peers, collaborating on distributed computations. Because each peer has the same capabilities, we won't have to worry about keeping track of 'primary' or 'secondary' nodes. On the contrary, we'll see that it's easy to send a request to any peer in the cluster and receive the correct response.

Adding Distribution To The System

To achieve this, we'll use a technique called "resource discovery", popularized in section 8.3 of the excellent book Erlang and OTP In Action. Following this pattern, we'll avoid hard-coding locations and make it easy to add nodes to the cluster by treating each MapReduce node as a service which can be registered with, and located from, a shared registration service.

Another handy side-effect of this approach is that we won't have to make any changes whatsoever to our worker.erl module. The original version of the worker already contains everything we need to operate in a multi-node environment.

A Crash Course In Resource Discovery

For our purposes, think of a resource as any service that we want other nodes to have access to. In this case, we only have a single type of resource: the MapReduce service. Each node that participates in the system will advertise its availability and also request MapReduce services from its peers.

Bootstrapping The Cluster

Connections between nodes in an Erlang cluster are transitive. That is, if A connects to B and B connects to C, then A will also be able to communicate with C. We'll take advantage of this property to make it easy to join the cluster.

To do so, we'll use two types of nodes, denoted contact nodes and server nodes. The GitHub repository contains scripts for both Windows and Ubuntu to start each type of node. You should start one contact node named "contact1" in a terminal and leave it open for the duration of your work:

> ./start_contact_node.sh contact1

This contact node is just an empty Erlang system with none of our code loaded. Later, we'll open more terminals and start a server node in each. The server nodes will automatically connect to the contact node by name, thus joining the cluster and seeing the other running peers.

Modifying The Server

It's time to dig into the code changes, starting with the mrs.erl module. To begin, let's look at the server's start function:

It begins with the same three lines as before to start the server and register it, but then it performs the two resource discovery tasks described above. The function add_local_resource(?SERVER, Pid) advertises that this instance is providing a service, then add_target_resource_type(?SERVER) makes a request to be informed of other peers offering a MapReduce service. Finally, trade_resources() invokes the network communication required to exchange those requests with the other nodes in the cluster. Since the world is asynchronous, we simply need to wait for a moment for other nodes to respond, then we're done the startup process including resource discovery.

The next change enables the server to communicate with its peers. Whereas before all communication was performed via direct function calling or messages passed with the ! operator, now we need to send messages to all peers and collect their replies in order to coordinate computations:

Messaging the entire cluster is straight forward: get the list of all peers (including the current node) from the local resource cache, then send each a direct message. Finally, return the number of peers messaged, in case it comes in handy for debugging.

Modified Computation Algorithms

There are very few changes required to the mapreduce feature in order to make it work in a cluster. The biggest change is that rather than await a single reduce reply, we now need to collect replies from all of the peers in the cluster. Fortunately, this mirrors the process of collecting map replies from workers, which we looked at in part one.

One extra feature added to the system is support for a finalize function, which can be performed on the aggregated reduce result as a final step in the computation. This is handy, for example, when computing the mean of the values stored in the distributed system:

The aggregated result is a tuple {Sum, Count} and in the finalize phase we simply perform a division operation to produce the mean value that we were looking for. For many simple algorithms, nothing needs to be done during the finalize, so a default is provided:

Running The System

Running the updated system is similar to the single-node operation in Part One of this series, except that we'll need to run a few Erlang nodes at a time. The easiest way to do this is to open a separate terminal window for each node.

To start, you should already have a contact node, named contact1 running, as described in "Bootstrapping The Cluster" above. Now we can start two server nodes, each in their own terminal window. In the first terminal window run:

> ./start_server.sh server1

In the second window, start server2 by running:

> ./start_server.sh server2

You should now have one contact node and two server nodes running on your local computer. You can view the data stored in the cluster by calling mrs:print().in either server. Storing integers and running computations works exactly as before, but take note that you can perform the operations in both server nodes! Each server is a fully functioning peer in the system.

Exercises