MapReduce Part Three

We add data replication and automatic failover to the multi-node parallel data processing system.

In part one we began by creating a simple MapReduce system that runs on only one node. In part two we adjusted the system to work with multiple nodes in a cluster. Here, in part three, we'll add data replication and automatic fail-over when nodes crash.

Complete code for this tutorial is available in the GitHub repository.

Recap: What We've Done So Far

This tutorial series began by creating a simple parallel data processing system that ran only on a single machine. In that system, we created worker processes to store a set of integers and run calculations over that dataset using the MapReduce paradigm. We then adjusted the system to run on multiple computers, or nodes, in a fully peer-to-peer topology. The decision not to use a master/slave arrangement will now come in handy as we add data replication and automatic fail-over to the distributed system. This will allow the cluster to maintain the data hosted by a node that crashes.

On most platforms adding a replication feature like this would be a serious undertaking. Fortunately, Erlang's native message passing and crash signaling between physical nodes will do virtually all of the heavy lifting for us.

Replication Basics

In this simple replication scheme, each peer will randomly select a replication partner. The peer will send it's partner a full copy of it's partition of the data set and, in return, the partner will monitor the health of the peer. If the peer crashes the partner will be notified and will bring that peer's data into it's own partition, thus preserving the portion of the dataset that would have been lost when the peer crashed.

To make clear what's going on, let's anthropomorphize our servers and pretend that Alice and Bob are MapReduce servers having a conversation about replication:

At this point, Bob is now responsible for all of Alice's data, and the cluster has one less node (since Alice has died) so any future writes will be distributed evenly among the remaining peers (including Bob). You'll also note that in the skit above, Alice prudently kept her replica with Bob up-to-date when she added new values to her copy of the data. This an important feature. In our system the initial replication command will send the entire data partition to the replica, and we'll update that replica incrementally by sending new values to the replication partner as they are written.


In order to build a full-blown replication system in our MapReduce cluster, we need to add three small features:

  1. To begin replication, the server needs to choose a peer and have all of its workers send their data to the chosen partner.
  2. When a worker adds an integer to its list, it must notify the replication partner that the replica has changed.
  3. A partner holding a replica dataset must monitor to original peer. If that original peer crashes, the replica will be brought into the main data set hosted at the partner node.

1) Kicking off Replication

Once the cluster is up and running (see the previous tutorial for instructions), we'll turn replication on using the replicate() API call:

Each server will receive a {replicate} message in its main server loop. When that happens, it randomly selects another peer then commands all of its workers to send that peer their lists of integers:

Each worker handles that command by simply sending their list to the destination server:

Finally, when the replication partner (AKA: the destination server) receives the replica it stores a tuple containing the worker process's Pid and the list of integers in another list variable named Replicas:

When all of this message passing has completed for all servers, replication is up and running throughout the cluster.

2) Incrementally Updating Replicas

To enable incremental updates, each worker must notify the replica host of new additions to the data by calling send_to_replica/2:

The send_to_replica/2 function is very compact, but note that the first argument pattern matches before a destination server has been chosen, and does nothing (that is, before replication is turned on the worker doesn't need to send updates anywhere):

3) Monitoring Peers for Crashes

The last step is for each replication partner to monitor the originating peer in case it crashes. The first step in this process is to use the process monitoring feature in Erlang to turn on monitoring of each process that sends us a replica. Although not mentioned above, we saw this already; it's the call to erlang:monitor/2:

With monitoring of the worker processes enabled, the replication partner can receive messages indicating that workers have died. When that happens, the replica from the crashing worker is located in the list of replicas and stored in the main dataset:

Using and Testing Replcation

With the cluster running, you can pretty easily test replication and fail-over. Having at least two nodes running, run some computations as we did before (e.g. compute:sum()). Then, turn on replication by calling mrs:replicate() at any node. You'll see lots of diagnostic messages printed in each node as the replicas are sent around the cluster. Finally, to test fail-over, you can manually kill individual worker processes or kill several at a time by stopping one of the nodes. In the remaining nodes, you'll see diagnostic output showing the fail-over as it happens. Now run some computations again to ensure that you're getting the same values as before.