MP7 MapReduce
CS 241 MP7 MapReduce
CS 241

MP7: MapReduce

Due Tue, Apr 17 2012 at 11:59 pm


IMPORTANT

You will be using fork() in this MP. You should understand what it means to fork bomb.

If you fork bomb the nightly autograder, you may be excluded from all future nightly autograders for this MP.
If you fork bomb the final autograder, you will get a 0 for this MP.
...once a fork bomb is detected, the autograder and all the other processes die. So nightly autograders may not always get out overnight.

Make sure to always run all five testers before committing your code.

Introduction

In 2004, Google developed a general framework for processing large data sets on clusters of computers. You can read more about MapReduce on Wikipedia, but we will explain everything you need to know below.

To explain what MapReduce does, we'll use a small dataset (the first few lines of a famous poem by Robert Frost):

Two roads diverged in a yellow wood,
And sorry I could not travel both
And be one traveler, long I stood
And looked down one as far as I could.


To run MapReduce, we first split the dataset into small pieces. For this example, we will split the dataset by the four lines of the poem:
Data Set #1
Input: "Two roads diverged in a yellow wood,"
Data Set #2
Input: "And sorry I could not travel both"
Data Set #3
Input: "And be one traveler, long I stood"
Data Set #4
Input: "And looked down one as far as I could."


As part of the input to any MapReduce program, a user will provide a map() function. This function will map the input into a series of (key, value) pairs. For this example, let the map() function simply count the number of each letter (a-z) in the data set.

This MapReduce algorithm will spawn 1 process per data set and run the map() function on each dataset:
Data Set #1
Input: "Two roads diverged in a yellow wood,"
pid = 1001
map() Output: a: 2, b: 0, c: 0, d: 4, e: 3, f: 0, g: 1, h: 0, i: 2, j: 0, k: 0, l: 2, m: 0, n: 1, o: 5, p: 0, q: 0, r: 2, s: 1, t: 1, u: 0, v: 1, w: 3, x: 0, y: 1, z: 0
Data Set #2
Input: "And sorry I could not travel both"
pid = 1002
map() Output: a: 2, b: 1, c: 1, d: 2, e: 1, f: 0, g: 0, h: 1, i: 1, j: 0, k: 0, l: 2, m: 0, n: 2, o: 4, p: 0, q: 0, r: 3, s: 1, t: 3, u: 1, v: 1, w: 0, x: 0, y: 1, z: 0
Data Set #3
Input: "And be one traveler, long I stood"
pid = 1003
map() Output: a: 2, b: 1, c: 0, d: 2, e: 4, f: 0, g: 1, h: 0, i: 1, j: 0, k: 0, l: 2, m: 0, n: 3, o: 4, p: 0, q: 0, r: 2, s: 1, t: 2, u: 0, v: 1, w: 0, x: 0, y: 0, z: 0
Data Set #4
Input: "And looked down one as far as I could."
pid = 1004
map() Output: a: 4, b: 0, c: 1, d: 4, e: 2, f: 1, g: 0, h: 0, i: 1, j: 0, k: 1, l: 2, m: 0, n: 3, o: 5, p: 0, q: 0, r: 1, s: 2, t: 0, u: 1, v: 0, w: 1, x: 0, y: 0, z: 0


The map() processes produce its output. And those output is the initial input of the reduce() fucntion. By using MapReduce Algorithm, the reduce() function then collects and combine the output of each process to form a single data set to return to the user. The output of reduce() should be a collection of values in the same domain of its input. In particular, for this example, the reduce() function is called every time the same key is seen from two different processes. And our reduce function will simply add the values of the two keys (eg: a: 2 and a: 4 will result in a: 6).

All reduce() calls are run in a single worker thread, within the process that called the MapReduce framework initially. The reduce() calls are not performed in a new or recycled map() process. Adding to our diagram:
Data Set #1
Input: "Two roads diverged in a yellow wood,"
pid = 1001
map() Output: a: 2, b: 0, c: 0, d: 4, e: 3, f: 0, g: 1, h: 0, i: 2, j: 0, k: 0, l: 2, m: 0, n: 1, o: 5, p: 0, q: 0, r: 2, s: 1, t: 1, u: 0, v: 1, w: 3, x: 0, y: 1, z: 0
Data Set #2
Input: "And sorry I could not travel both"
pid = 1002
map() Output: a: 2, b: 1, c: 1, d: 2, e: 1, f: 0, g: 0, h: 1, i: 1, j: 0, k: 0, l: 2, m: 0, n: 2, o: 4, p: 0, q: 0, r: 3, s: 1, t: 3, u: 1, v: 1, w: 0, x: 0, y: 1, z: 0
Data Set #3
Input: "And be one traveler, long I stood"
pid = 1003
map() Output: a: 2, b: 1, c: 0, d: 2, e: 4, f: 0, g: 1, h: 0, i: 1, j: 0, k: 0, l: 2, m: 0, n: 3, o: 4, p: 0, q: 0, r: 2, s: 1, t: 2, u: 0, v: 1, w: 0, x: 0, y: 0, z: 0
Data Set #4
Input: "And looked down one as far as I could."
pid = 1004
map() Output: a: 4, b: 0, c: 1, d: 4, e: 2, f: 1, g: 0, h: 0, i: 1, j: 0, k: 1, l: 2, m: 0, n: 3, o: 5, p: 0, q: 0, r: 1, s: 2, t: 0, u: 1, v: 0, w: 1, x: 0, y: 0, z: 0
pid = 1000, worker thread
reduce(), reduce(), reduce(), and more reduce()'ing
... ... ...
Result: a: 10, b: 2, c: 2, d: 12, e: 10, f: 1, g: 2, h: 1, i: 5, j: 0, k: 1, l: 8, m: 0, n: 9, o: 18, p: 0, q: 0, r: 8, s: 5, t: 6, u: 2, v: 3, w: 4, x: 0, y: 2, z: 0


For each unique key, reduce() will be called multiple times, until the multiple values outputted from the map()s are reduced down into a single value for that key. As an example, consider the key 'a'. The way the reduce() functions is called might depend on the timing of when the processes finish, but one possible sequence of operations is shown in the diagram below. (This depicts what happens only for the key 'a'; the other keys would be similar.)


In this MP, you will be provided the data sets (already split up for you) and and several example map() and reduce() functions. You will need to:
  • Create the map() processes. System call: fork()
  • Read the result of the map() processes over a pipe or fifo. System call: pipe() or fifo()
  • Process the results of each process using only a single thread, by using I/O multiplexing. System call: select() or poll()
  • Give the current set of completed results back to the user at any time, even if some of the map() processes are not yet finished.


What you must do...

What we provide for you:

Besides the tester programs, we provide you a libdictionary much line you made in MP1. You will find there are six functions:

  • dictionary_init()
    void dictionary_init(dictionary_t *d);

    Must be called first, initializes the dictionary data structure. Same as MP1.

  • dictionary_add()
    int dictionary_add(dictionary_t *d, const char *key, const char *value);

    Adds a (key, value) pair to the dictionary. Returns 0 on success or KEY_EXISTS if the key already exists in the dictionary. Same as MP1.

  • dictionary_get()
    const char *dictionary_get(dictionary_t *d, const char *key);

    Returns the stored value associated with the key if the key exists in the dictionary. If the key does not exist, this function will return NULL. Same as MP1.

  • dictionary_remove()
    int dictionary_remove(dictionary_t *d, const char *key);

    Removes the (key, value) entry from the dictionary. Returns 0 on success or NO_KEY_EXISTS if the key was not present in the dictionary. This function does not free the memory used by key or value. Same as MP1.

  • dictionary_remove_free()
    int dictionary_remove_free(dictionary_t *d, const char *key);

    Removes the (key, value) entry from the dictionary. Returns 0 on success or NO_KEY_EXISTS if the key was not present in the dictionary. This function WILL call free(key) and free(value) on the (key, value) pair stored in the dictionary before returning. (NOTE: This function is unsafe if you have used non-heap memory when calling dictionary_add().)

  • dictionary_destroy()
    void dictionary_destroy(dictionary_t *d);

    Frees all internal memory associated with the dictionary. Must be called last. Same as MP1.

  • dictionary_destroy_free()
    void dictionary_destroy_free(dictionary_t *d);

    Frees all internal memory associated with the dictionary AND makes a call to dictionary_remove_free() for each entry that still exists in the dictionary at the time dictionary_destroy_free() is called. Since this function makes use of dictionary_remove_free(), this function is unsafe if you have used non-heap memory when calling dictionary_add().

Additionally, all the libdictionary except _init() and _destroy() functions are thread-safe.

You will find we also provide a read_from_fd() helper function, which we will explain later in this file.

What you must do for this MP:

This MP is divided into two parts. First, you must complete the five core functions that make up libmapreduce:
  • mapreduce_init()
    void mapreduce_init(mapreduce_t *mr,
                        void (*mymap)(int, const char *),
                        const char *(*myreduce)(const char *, const char *));

    This function will be the first call made to the libmapreduce library. You should put any initialization logic here. (You will likely want to store the mymap and myreduce functions inside mr for later use; mapreduce_init() should not call either of these functions.)


    The mymap function pointer is a pointer to a function of the following format:

    void map(int fd, const char *data)
    ...where fd is the file descriptor to which map() will write the results of the map() operation on the dataset data. The map() function will always write the (key, value) result to fd in the format key: value\n. As a printf(), this would be:
    printf("%s: %s\n", key, value);
    You do not need to write mymap(), it is passed in as a function pointer to mapreduce_init(). You should note that map() will always close the fd passed to it before returning.


    myreduce is a pointer to a function of the following format:
    const char *reduce(const char *value1, const char *value2)
    ...where reduce() will return a newly malloc()'d region of memory that is the "reduction" of value1 and value2. Since this function will malloc() new memory, you will need to free() this memory at some later point.

    You do not need to write myreduce, it is passed in as a function pointer to mapreduce_init().


  • mapreduce_map_all()
    void mapreduce_map_all(mapreduce_t *mr, const char **values);

    This is the main function of the first part of this MP. This function will only be called once.

    As input to this function, values contains a NULL-terminated array of C-strings. (If there are three values, values[0], values[1], values[2] will point to C-strings of the data sets each of your map() processes should use and value[3] will be equal to NULL.) Each of the strings in values will be one data set.

    In this function, you must launch one process per data set, and one worker thread (the worker is a thread within the process that called mapreduce_map_all()). Each new process will call map() on one data set. The worker thread will use multiple calls to reduce() to process the data coming back from the map() processes you have launched.

    In the description of the map() function, you saw that you will need to pass a file descriptor fd into map(). This file descriptor should be the writing side of a pipe or fifo that you create in this function. Once fork()'d and the pipe has been set up, the child process should need to only run code similar to the following:

    {
      /* child */
      map(fd, values[i]);
      exit(0); /* exit the child process */
    }

    Since mapreduce_map_all() is only called once, you may find it easier to launch all the processes, set up all the pipes, and then launch the one worker thread that will be reading the map() results from each of the child processes. Since you have only a single thread, and must read results as soon as they're available, you are unable to simply make a blocking read() call.

    Instead, you should use select() or poll() to query which of the file descriptors are ready to be read without blocking. Since you are reading from N different streams, it may be useful to create N buffers (one for each process). If you create one buffer per process, we have provided a helper function for to assist with reading the results of the map() processes:
    int read_from_fd(int fd, char *buffer, mapreduce_t *mr)
    ...this function will read() data from fd and make a call to process_key_value() for each (key, value) that was read. If a line was only partially read, this function will leave the un-processed data in buffer and expects it to be read the next time the function is called. This function expects buffer to initially be of size BUFFER_SIZE + 1 and for buffer[0] == '\0'. Finally, the mr pointer is simply passed-through to process_key_value() as it may be useful to you, as you will need to write the logic to process the key and the value.

    A call to read_from_fd() may look like read_from_fd(fds[i], buffer[i], mr). The return value of read_from_fd() will be 1 if data was successfully read. You'll notice that this function will blindly attempt to read() from the fd. It's up to you to only call this function if data is available to be read().


    Regardless of whether you choose to use read_from_fd() or write your own function, each time a (key, value) pair is received, it must be updated in your internal data structure. If the (key, value) contains a key you have never seen before, this (key, value) should be stored. If the (key, value) contains a key already stored, you must call reduce() on the stored value and the newly read value and then update the value of key to the value returned by reduce().

    mapreduce_map_all() MUST NOT block waiting for the thread or processes to finish. In other words, typically,mapreduce_map_all() will return before the child processes and the worker thread have completed.


  • mapreduce_reduce_all()
    void mapreduce_reduce_all(mapreduce_t *mr)

    This function will always be called only once, and will always be called sometime after mapreduce_map_all() is called.

    This function should block until all map()'ing and reduce()'ing has completed.


  • mapreduce_get_value()
    const char *mapreduce_get_value(mapreduce_t *mr, const char *result_key)

    This function should return the current value of result_key. If the result_key does not exist, return NULL. This function may be called at any time, including while the map() processes are working (as more map()'s finish, the value for the same key will likely change between calls to this function).


  • mapreduce_destroy()
    void mapreduce_destroy(mapreduce_t *mr)

    Free all your memory. :)  Will always be called last.

Secondly, you must complete the map() and reduce() functions for test6.c, which aims to find the longest word in the entire text of Alice in Wonderland:
  • map()
    void map(int fd, const char *data)

    This function should take input from char *data and then map the longest word to the key: long_key. Finally, it should write the key: value pair into the file with the form of long_key: XXXXXXXXX. A word is defined as a contiguous block of alphabetic characters ([a-zA-Z]+). You can use the function isalpha() to test a single character.

  • reduce()
    const char *reduce(const char *value1, const char *value2)

    This function should take two words and reduce to the longer of the two.

For the second part, the test cases that we have provided are good examples for you to follow, especially test4.c

Compile and Run

As always, compile using:
make clean
make
We provide six test cases (the WikiTalk test is newly added as of April 12):
  • test1: One dataset and nothing to reduce().
  • test2: Two datasets with only one key, resulting in one reduce().
  • test3: A tester that runs the example in the beginning of this file (first four lines of Robert Frost's poem).
  • test4: A tester running MapReduce on the entire text of Alice in Wonderland.
  • test5: A tester testing if calls to mapreduce_get_value() update as map() processes finish at different times.
  • WikiTalk: A tester running MapReduce on the Wikipedia Talk Network.
From test1 to test 5, the expected outputs are:
[netid@linux1 mp7]$ ./test1
letters: 8
[netid@linux1 mp7]$ ./test2
letters: 21
[netid@linux1 mp7]$ ./test3
a: 10
b: 2
c: 2
d: 12
e: 10
f: 1
g: 2
h: 1
i: 5
j: 0
k: 1
l: 8
m: 0
n: 9
o: 18
p: 0
q: 0
r: 8
s: 5
t: 6
u: 2
v: 3
w: 4
x: 0
y: 2
z: 0
[netid@linux1 mp7]$ ./test4
the: 1804
and: 912
alice: 385
some-word-that-wont-exist: (null)
[netid@linux1 mp7]$ ./test5
Sleeping for 6 seconds (job #1)...
Sleeping for 5 seconds (job #2)...
Sleeping for 4 seconds (job #3)...
Sleeping for 3 seconds (job #4)...
value: (null)
value: (null)
value: (null)
value: 1
value: 2
value: 3
value: 4
value: 4
value: 4
value: 4
(NOTE: ./test5 may be slightly different depending on how you create your IPC mechanism.)

For WikiTalk, you will be given a file which represents a social network: specifically, relationships between users on Wikipedia who have chatted with each other from the inception of Wikipedia till January 2008. You can access the test file on an EWS machine at /home/pbg/cs241/wikitalk.txt. Nodes in the network represent Wikipedia users and a directed edge from node i to node j means that user i edited the talk page of user j at least once. (The file is anonymized, with numbers representing users.) The first ten lines of wikitalk.txt will be like:

0 10
2 10
2 21
2 46
2 63
2 88
2 93
2 94
2 101
2 102
where each row of two nodes form a directed edge A->B, meaning that user A edited the talk page of B. We can apply the algorithm of MapReduce to find out the information of out-edits and in-edits for Wikipedia users. Out-edits for a particular user i is the number of other users whose talk page i edited. On the contrary, in-edits is the number of other users who edited i's talk page.

Before running, you should compile by using:
make clean
make
and then type the command
gcc -g -W -Wall WikiTalk.c libdictionary.o libmapreduce.o -o WikiTalk -lpthread
For WikiTalk, we can check the number of out-edits for any node by:
[netid@linux1 mp7]$ ./WikiTalk /home/pbg/cs241/wikitalk.txt out
Then you are asked to input the id of the node (an arbitrary number). And the expected output is:
> Checking the number of out-edits, please wait...
> Please enter a number (or any character to exit): 1
> result: (null)
> Please enter a number (or any character to exit): 2
> 2: 110
> Please enter a number (or any character to exit): 3
> 3: 3618
> Please enter a number (or any character to exit):
Similarly, when you try to check the number of in-edits for any node, the expected output is:
[netid@linux1 mp7]$ ./WikiTalk /home/pbg/cs241/wikitalk.txt in
> Checking the number of in-edits, please wait...
> Please enter a number (or any character to exit): 1
> 1: 5
> Please enter a number (or any character to exit): 2
> 2: 126
> Please enter a number (or any character to exit): 3
> 3: 1366
> Please enter a number (or any character to exit): 192301231
> result: (null)
(NOTE: The number, which is the node ID you entered, is arbitrary. If that node does not exit, the result should be (null). Also note that WikiTalk will be part of your final credits.)

Grading

For valgrind memory grading, we will only be testing if your program cleaned up all memory in your original, parent process. You should run:

valgrind --child-silent-after-fork=yes --leak-check=full ./test#
...when running your valgrind tests.

Finally, remember that big red warning at the top of this page.

Grading, Submission, and Other Details

Please fully read cs241.html for more details on grading, submission, and other topics that are shared between all MPs in CS 241.
 All Data Structures Files Functions