CS 241

MP5: MapReduce


IMPORTANT

You will be using fork() in this MP. You should understand what it means to fork bomb.

If you fork bomb the autograder, you will get a 0 for this MP. No exceptions!

Make sure to always run all five testers before committing your code.

Introduction

In 2004, Google developed a general framework for processing large data sets on clusters of computers. We recommend you read the link MapReduce on Wikipedia for some background information. However, we will explain everything you need to know below.

To explain what MapReduce does, we'll use a small dataset (the first few lines of a famous poem by Robert Frost):

Two roads diverged in a yellow wood,
And sorry I could not travel both
And be one traveler, long I stood
And looked down one as far as I could.


To run MapReduce, we first split the dataset into small pieces. For this example, we will split the dataset by the four lines of the poem:
Data Set #1
Input: "Two roads diverged in a yellow wood,"
Data Set #2
Input: "And sorry I could not travel both"
Data Set #3
Input: "And be one traveler, long I stood"
Data Set #4
Input: "And looked down one as far as I could."


As part of the input to any MapReduce program, a user will provide a map() function. This function will map the input into a series of (key, value) pairs. For this example, let the map() function simply count the number of each letter (a-z) in the data set.

This MapReduce algorithm will spawn 1 process per data set and run the map() function on each dataset:
Data Set #1
Input: "Two roads diverged in a yellow wood,"
pid = 1001
map() Output: a: 2, b: 0, c: 0, d: 4, e: 3, f: 0, g: 1, h: 0, i: 2, j: 0, k: 0, l: 2, m: 0, n: 1, o: 5, p: 0, q: 0, r: 2, s: 1, t: 1, u: 0, v: 1, w: 3, x: 0, y: 1, z: 0
Data Set #2
Input: "And sorry I could not travel both"
pid = 1002
map() Output: a: 2, b: 1, c: 1, d: 2, e: 1, f: 0, g: 0, h: 1, i: 1, j: 0, k: 0, l: 2, m: 0, n: 2, o: 4, p: 0, q: 0, r: 3, s: 1, t: 3, u: 1, v: 1, w: 0, x: 0, y: 1, z: 0
Data Set #3
Input: "And be one traveler, long I stood"
pid = 1003
map() Output: a: 2, b: 1, c: 0, d: 2, e: 4, f: 0, g: 1, h: 0, i: 1, j: 0, k: 0, l: 2, m: 0, n: 3, o: 4, p: 0, q: 0, r: 2, s: 1, t: 2, u: 0, v: 1, w: 0, x: 0, y: 0, z: 0
Data Set #4
Input: "And looked down one as far as I could."
pid = 1004
map() Output: a: 4, b: 0, c: 1, d: 4, e: 2, f: 1, g: 0, h: 0, i: 1, j: 0, k: 1, l: 2, m: 0, n: 3, o: 5, p: 0, q: 0, r: 1, s: 2, t: 0, u: 1, v: 0, w: 1, x: 0, y: 0, z: 0


The map() processes produce its output. And those output is the initial input of the reduce() fucntion. By using MapReduce Algorithm, the reduce() function then collects and combine the output of each process to form a single data set to return to the user. The output of reduce() should be a collection of values in the same domain of its input. In particular, for this example, the reduce() function is called every time the same key is seen from two different processes. And our reduce function will simply add the values of the two keys (eg: a: 2 and a: 4 will result in a: 6).

All reduce() calls are run in a single worker thread, within the process that called the MapReduce framework initially. The reduce() calls are not performed in a new or recycled map() process. Adding to our diagram:
Data Set #1
Input: "Two roads diverged in a yellow wood,"
pid = 1001
map() Output: a: 2, b: 0, c: 0, d: 4, e: 3, f: 0, g: 1, h: 0, i: 2, j: 0, k: 0, l: 2, m: 0, n: 1, o: 5, p: 0, q: 0, r: 2, s: 1, t: 1, u: 0, v: 1, w: 3, x: 0, y: 1, z: 0
Data Set #2
Input: "And sorry I could not travel both"
pid = 1002
map() Output: a: 2, b: 1, c: 1, d: 2, e: 1, f: 0, g: 0, h: 1, i: 1, j: 0, k: 0, l: 2, m: 0, n: 2, o: 4, p: 0, q: 0, r: 3, s: 1, t: 3, u: 1, v: 1, w: 0, x: 0, y: 1, z: 0
Data Set #3
Input: "And be one traveler, long I stood"
pid = 1003
map() Output: a: 2, b: 1, c: 0, d: 2, e: 4, f: 0, g: 1, h: 0, i: 1, j: 0, k: 0, l: 2, m: 0, n: 3, o: 4, p: 0, q: 0, r: 2, s: 1, t: 2, u: 0, v: 1, w: 0, x: 0, y: 0, z: 0
Data Set #4
Input: "And looked down one as far as I could."
pid = 1004
map() Output: a: 4, b: 0, c: 1, d: 4, e: 2, f: 1, g: 0, h: 0, i: 1, j: 0, k: 1, l: 2, m: 0, n: 3, o: 5, p: 0, q: 0, r: 1, s: 2, t: 0, u: 1, v: 0, w: 1, x: 0, y: 0, z: 0
pid = 1000, worker thread
reduce(), reduce(), reduce(), and more reduce()'ing
... ... ...
Result: a: 10, b: 2, c: 2, d: 12, e: 10, f: 1, g: 2, h: 1, i: 5, j: 0, k: 1, l: 8, m: 0, n: 9, o: 18, p: 0, q: 0, r: 8, s: 5, t: 6, u: 2, v: 3, w: 4, x: 0, y: 2, z: 0


For each unique key, reduce() will be called multiple times, until the multiple values outputted from the map()s are reduced down into a single value for that key. As an example, consider the key 'a'. The way the reduce() functions is called might depend on the timing of when the processes finish, but one possible sequence of operations is shown in the diagram below. (This depicts what happens only for the key 'a'; the other keys would be similar.)


In this MP, you will be provided the data sets (already split up for you) and several example map() and reduce() functions. You will need to:


The MP

What we provide for you:

Besides the tester programs, we provide you a data store libds that allows you to store the results of your MapReduce. libds includes _get(), _put(), _update(), and _delete() operations that work similarly to the dictionary you built in MP1. However, these functions provide revision numbers to each key. When you update or delete a key, you are required to provide the latest revision number you know about. If the revision number does not match what is in the data store, the update or delete operation will notify you that it could not complete the operation because the data your program knew about is out of date. You can find the full API to libds here.

You will find we also provide a read_from_fd() helper function, which we will explain later in this file.

What you must do for this MP:

[Part 1]

This MP is divided into two parts. First, you must complete the five core functions that make up libmapreduce:

[Part 2]

Secondly, you must complete the map() and reduce() functions for test6.c, which aims to find the lines that contain any of the variations of "wonder" in the entire text of Alice in Wonderland. There are only three variations in this example, i.e. "wonder", "Wonder", and "WONDER". For testing, you can compare your result with the output of grep like the following:
grep "wonder" alice.txt
grep "Wonder" alice.txt
grep "WONDER" alice.txt
Please do not modify the main function.

Compile and Run

As always, compile using:
make clean
make
We provide five test cases: (NOTE: ./test5 may be slightly different depending on how you create your IPC mechanism.)

From test1 to test 5, the expected outputs are:
[netid@linux1 mp7]$ ./test1
letters: 8
[netid@linux1 mp7]$ ./test2
letters: 21
[netid@linux1 mp7]$ ./test3
a: 10
b: 2
c: 2
d: 12
e: 10
f: 1
g: 2
h: 1
i: 5
j: 0
k: 1
l: 8
m: 0
n: 9
o: 18
p: 0
q: 0
r: 8
s: 5
t: 6
u: 2
v: 3
w: 4
x: 0
y: 2
z: 0
[netid@linux1 mp7]$ ./test4
the: 1804
and: 912
alice: 385
some-word-that-wont-exist: (null)
[netid@linux1 mp7]$ ./test5
Sleeping for 6 seconds (job #1)...
Sleeping for 5 seconds (job #2)...
Sleeping for 4 seconds (job #3)...
Sleeping for 3 seconds (job #4)...
value: (null)
value: (null)
value: (null)
value: 1
value: 2
value: 3
value: 4
value: 4
value: 4
value: 4

Grading

For valgrind memory grading, we will only be testing if your program cleaned up all memory in your original, parent process. You should run:

valgrind --child-silent-after-fork=yes --leak-check=full ./test#
...when running your valgrind tests.

Finally, remember that big red warning at the top of this page.