# MP7: MapReduce

### Due: Monday, November 18, 2013

IMPORTANT

You will be using fork() in this MP. You should understand what it means to fork bomb.

If you fork bomb the nightly autograder, you will be excluded from all future nightly autograders for this MP. You will be notified of this.
If you fork bomb the final autograder, you will get a 0 for this MP. No exceptions!
Once a fork bomb is detected, the autograder and all the other processes die. So nightly autograders may not always get out overnight.

Make sure to always run all five testers before committing your code.

## Introduction

In 2004, Google developed a general framework for processing large data sets on clusters of computers. We recommend you read the link MapReduce on Wikipedia for some background information. However, we will explain everything you need to know below.

To explain what MapReduce does, we'll use a small dataset (the first few lines of a famous poem by Robert Frost):

Two roads diverged in a yellow wood,
And sorry I could not travel both
And be one traveler, long I stood
And looked down one as far as I could.

To run MapReduce, we first split the dataset into small pieces. For this example, we will split the dataset by the four lines of the poem:
 Data Set #1 Input: "Two roads diverged in a yellow wood," Data Set #2 Input: "And sorry I could not travel both" Data Set #3 Input: "And be one traveler, long I stood" Data Set #4 Input: "And looked down one as far as I could."

As part of the input to any MapReduce program, a user will provide a map() function. This function will map the input into a series of (key, value) pairs. For this example, let the map() function simply count the number of each letter (a-z) in the data set.

This MapReduce algorithm will spawn 1 process per data set and run the map() function on each dataset:
 Data Set #1 Input: "Two roads diverged in a yellow wood," pid = 1001 map() Output: a: 2, b: 0, c: 0, d: 4, e: 3, f: 0, g: 1, h: 0, i: 2, j: 0, k: 0, l: 2, m: 0, n: 1, o: 5, p: 0, q: 0, r: 2, s: 1, t: 1, u: 0, v: 1, w: 3, x: 0, y: 1, z: 0 Data Set #2 Input: "And sorry I could not travel both" pid = 1002 map() Output: a: 2, b: 1, c: 1, d: 2, e: 1, f: 0, g: 0, h: 1, i: 1, j: 0, k: 0, l: 2, m: 0, n: 2, o: 4, p: 0, q: 0, r: 3, s: 1, t: 3, u: 1, v: 1, w: 0, x: 0, y: 1, z: 0 Data Set #3 Input: "And be one traveler, long I stood" pid = 1003 map() Output: a: 2, b: 1, c: 0, d: 2, e: 4, f: 0, g: 1, h: 0, i: 1, j: 0, k: 0, l: 2, m: 0, n: 3, o: 4, p: 0, q: 0, r: 2, s: 1, t: 2, u: 0, v: 1, w: 0, x: 0, y: 0, z: 0 Data Set #4 Input: "And looked down one as far as I could." pid = 1004 map() Output: a: 4, b: 0, c: 1, d: 4, e: 2, f: 1, g: 0, h: 0, i: 1, j: 0, k: 1, l: 2, m: 0, n: 3, o: 5, p: 0, q: 0, r: 1, s: 2, t: 0, u: 1, v: 0, w: 1, x: 0, y: 0, z: 0

The map() processes produce its output. And those output is the initial input of the reduce() fucntion. By using MapReduce Algorithm, the reduce() function then collects and combine the output of each process to form a single data set to return to the user. The output of reduce() should be a collection of values in the same domain of its input. In particular, for this example, the reduce() function is called every time the same key is seen from two different processes. And our reduce function will simply add the values of the two keys (eg: a: 2 and a: 4 will result in a: 6).

All reduce() calls are run in a single worker thread, within the process that called the MapReduce framework initially. The reduce() calls are not performed in a new or recycled map() process. Adding to our diagram:
 Data Set #1 Input: "Two roads diverged in a yellow wood," pid = 1001 map() Output: a: 2, b: 0, c: 0, d: 4, e: 3, f: 0, g: 1, h: 0, i: 2, j: 0, k: 0, l: 2, m: 0, n: 1, o: 5, p: 0, q: 0, r: 2, s: 1, t: 1, u: 0, v: 1, w: 3, x: 0, y: 1, z: 0 Data Set #2 Input: "And sorry I could not travel both" pid = 1002 map() Output: a: 2, b: 1, c: 1, d: 2, e: 1, f: 0, g: 0, h: 1, i: 1, j: 0, k: 0, l: 2, m: 0, n: 2, o: 4, p: 0, q: 0, r: 3, s: 1, t: 3, u: 1, v: 1, w: 0, x: 0, y: 1, z: 0 Data Set #3 Input: "And be one traveler, long I stood" pid = 1003 map() Output: a: 2, b: 1, c: 0, d: 2, e: 4, f: 0, g: 1, h: 0, i: 1, j: 0, k: 0, l: 2, m: 0, n: 3, o: 4, p: 0, q: 0, r: 2, s: 1, t: 2, u: 0, v: 1, w: 0, x: 0, y: 0, z: 0 Data Set #4 Input: "And looked down one as far as I could." pid = 1004 map() Output: a: 4, b: 0, c: 1, d: 4, e: 2, f: 1, g: 0, h: 0, i: 1, j: 0, k: 1, l: 2, m: 0, n: 3, o: 5, p: 0, q: 0, r: 1, s: 2, t: 0, u: 1, v: 0, w: 1, x: 0, y: 0, z: 0 pid = 1000, worker thread reduce(), reduce(), reduce(), and more reduce()'ing ... ... ... Result: a: 10, b: 2, c: 2, d: 12, e: 10, f: 1, g: 2, h: 1, i: 5, j: 0, k: 1, l: 8, m: 0, n: 9, o: 18, p: 0, q: 0, r: 8, s: 5, t: 6, u: 2, v: 3, w: 4, x: 0, y: 2, z: 0

For each unique key, reduce() will be called multiple times, until the multiple values outputted from the map()s are reduced down into a single value for that key. As an example, consider the key 'a'. The way the reduce() functions is called might depend on the timing of when the processes finish, but one possible sequence of operations is shown in the diagram below. (This depicts what happens only for the key 'a'; the other keys would be similar.)

In this MP, you will be provided the data sets (already split up for you) and several example map() and reduce() functions. You will need to:
• Create the map() processes. System call: fork()
• Read the result of the map() processes over a pipe or fifo. System call: pipe() or fifo()
• Process the results of each process using only a single thread, by using I/O multiplexing. System call: epoll()
• Give the current set of completed results back to the user at any time, even if some of the map() processes are not yet finished.

## The MP

#### What we provide for you:

Besides the tester programs, we provide you a data store libds that allows you to store the results of your MapReduce. libds includes _get(), _put(), _update(), and _delete() operations that work similarly to the dictionary you built in MP1. However, these functions provide revision numbers to each key. When you update or delete a key, you are required to provide the latest revision number you know about. If the revision number does not match what is in the data store, the update or delete operation will notify you that it could not complete the operation because the data your program knew about is out of date. You can find the full API to libds here.

You will find we also provide a read_from_fd() helper function, which we will explain later in this file.

#### [Part 1]

This MP is divided into two parts. First, you must complete the five core functions that make up libmapreduce:
• mapreduce_init()
void mapreduce_init(mapreduce_t *mr,
void (*mymap)(int, const char *),
const char *(*myreduce)(const char *, const char *));

This function will be the first call made to the libmapreduce library. You should put any initialization logic here. (You will likely want to store the mymap and myreduce functions inside mr for later use; mapreduce_init() should not call either of these functions.)

The mymap function pointer is a pointer to a function of the following format:

void map(int fd, const char *data)
...where fd is the file descriptor to which map() will write the results of the map() operation on the dataset data. The map() function will always write the (key, value) result to fd in the format key: value\n. As a printf(), this would be:
printf("%s: %s\n", key, value);
You do not need to write mymap(), it is passed in as a function pointer to mapreduce_init(). You should note that map() will always close the fd passed to it before returning.

myreduce is a pointer to a function of the following format:
const char *reduce(const char *value1, const char *value2)
...where reduce() will return a newly malloc()'d region of memory that is the "reduction" of value1 and value2. Since this function will malloc() new memory, you will need to free() this memory at some later point.

You do not need to write myreduce, it is passed in as a function pointer to mapreduce_init().

• mapreduce_map_all()
void mapreduce_map_all(mapreduce_t *mr, const char **values);

This is the main function of the first part of this MP. This function will only be called once.

As input to this function, values contains a NULL-terminated array of C-strings. (If there are three values, values[0], values[1], values[2] will point to C-strings of the data sets each of your map() processes should use and value[3] will be equal to NULL.) Each of the strings in values will be one data set.

In this function, you must launch one process per data set, and one worker thread (the worker is a thread within the process that called mapreduce_map_all()). Each new process will call map() on one data set. The worker thread will use multiple calls to reduce() to process the data coming back from the map() processes you have launched.

In the description of the map() function, you saw that you will need to pass a file descriptor fd into map(). This file descriptor should be the writing side of a pipe or fifo that you create in this function. Once fork()'d and the pipe has been set up, the child process should need to only run code similar to the following:

{
/* child */
map(fd, values[i]);
exit(0); /* exit the child process */
}

Since mapreduce_map_all() is only called once, you may find it easier to launch all the processes, set up all the pipes, and then launch the one worker thread that will be reading the map() results from each of the child processes. Since you have only a single thread, and must read results as soon as they're available, you are unable to simply make a blocking read() call.

Instead, you must use epoll() to query which of the file descriptors are ready to be read without blocking. Since you are reading from N different streams, it may be useful to create N buffers (one for each process). If you create one buffer per process, we have provided a helper function for to assist with reading the results of the map() processes:
int read_from_fd(int fd, char *buffer, mapreduce_t *mr)
...this function will read() data from fd and make a call to process_key_value() for each (key, value) that was read. If a line was only partially read, this function will leave the un-processed data in buffer and expects it to be read the next time the function is called. This function expects buffer to initially be of size BUFFER_SIZE + 1 and for buffer[0] == '\0'. Finally, the mr pointer is simply passed-through to process_key_value() as it may be useful to you, as you will need to write the logic to process the key and the value.

A call to read_from_fd() may look like read_from_fd(fds[i], buffer[i], mr). The return value of read_from_fd() will be 1 if data was successfully read. You'll notice that this function will blindly attempt to read() from the fd. It's up to you to only call this function if data is available to be read().

Regardless of whether you choose to use read_from_fd() or write your own function, each time a (key, value) pair is received, it must be updated in your internal data structure. If the (key, value) contains a key you have never seen before, this (key, value) should be stored. If the (key, value) contains a key already stored, you must call reduce() on the stored value and the newly read value and then update the value of key to the value returned by reduce().

mapreduce_map_all() MUST NOT block waiting for the thread or processes to finish. In other words, typically,mapreduce_map_all() will return before the child processes and the worker thread have completed.

• mapreduce_reduce_all()
void mapreduce_reduce_all(mapreduce_t *mr)

This function will always be called only once, and will always be called sometime after mapreduce_map_all() is called.

This function should block until all map()'ing and reduce()'ing has completed.

• mapreduce_get_value()
const char *mapreduce_get_value(mapreduce_t *mr, const char *result_key)

This function should return the current value of result_key. If the result_key does not exist, return NULL. If a string is returned, it should be a newly allocated string that will need to be freed by the caller. This function may be called at any time, including while the map() processes are working (as more map()'s finish, the value for the same key will likely change between calls to this function).

• mapreduce_destroy()
void mapreduce_destroy(mapreduce_t *mr)

Free all your memory. :)  Will always be called last.

#### [Part 2]

Secondly, you must complete the map() and reduce() functions for test6.c, which aims to find the longest word in the entire text of Alice in Wonderland:
• map()
void map(int fd, const char *data)

This function should take input from char *data and then map the longest word to the key: long_key. Finally, it should write the key: value pair into the file with the form of long_key: XXXXXXXXX. A word is defined as a contiguous block of alphabetic characters ([a-zA-Z]+). You can use the function isalpha() to test a single character.

• reduce()
const char *reduce(const char *value1, const char *value2)

This function should take two words and reduce to the longer of the two.

For the second part, the test cases that we have provided are good examples for you to follow, especially test4.c

## Compile and Run

As always, compile using:
make clean
make
We provide five test cases:
• test1: One dataset and nothing to reduce().
• test2: Two datasets with only one key, resulting in one reduce().
• test3: A tester that runs the example in the beginning of this file (first four lines of Robert Frost's poem).
• test4: A tester running MapReduce on the entire text of Alice in Wonderland.
• test5: A tester testing if calls to mapreduce_get_value() update as map() processes finish at different times.
(NOTE: ./test5 may be slightly different depending on how you create your IPC mechanism.)

From test1 to test 5, the expected outputs are:
[netid@linux1 mp7]\$ ./test1
letters: 8
[netid@linux1 mp7]\$ ./test2
letters: 21
[netid@linux1 mp7]\$ ./test3
a: 10
b: 2
c: 2
d: 12
e: 10
f: 1
g: 2
h: 1
i: 5
j: 0
k: 1
l: 8
m: 0
n: 9
o: 18
p: 0
q: 0
r: 8
s: 5
t: 6
u: 2
v: 3
w: 4
x: 0
y: 2
z: 0
[netid@linux1 mp7]\$ ./test4
the: 1804
and: 912
alice: 385
some-word-that-wont-exist: (null)
[netid@linux1 mp7]\$ ./test5
Sleeping for 6 seconds (job #1)...
Sleeping for 5 seconds (job #2)...
Sleeping for 4 seconds (job #3)...
Sleeping for 3 seconds (job #4)...
value: (null)
value: (null)
value: (null)
value: 1
value: 2
value: 3
value: 4
value: 4
value: 4
value: 4