MP 1: Introduction to MapReduce

Introduction

This MP will introduce the map/reduce computing paradigm. In essence, map/reduce breaks tasks down into a map phase and a reduce phase. Running our computation within this framework allows us to parallelize data processing, as we’ll see in a bit.

For this week, we’ll be doing all computation locally (on your laptop/computer/EWS). In the future, we can take this same code and run it on larger datasets on our Hadoop cluster.

Setup

To get setup, make sure that you have Python 3 (not Python 2) installed on your system. You can use an EWS workstation if you’d find that easier.

Furthermore, install the mrjob Python package like this:

pip install mrjob

Note: If you’re on EWS, or Python3 is not your system default python, you should install mrjob like this:

pip3 install mrjob --user

… then you should use the python3 command instead of python when running your code.

Key/Value Pairs

MapReduce operates in the context of <key, value> pairs. Most data can be represented in such a fashion. For example, in the classic “word count” example, words are represented such that the key is the word string, and the value is the number of occurrences of that word:

Example data:
(cat, 10)
(dog, 8)
(illinois, 2)
(the, 16)
(a, 32)

We also don’t need to use both fields. A common practice in map/reduce jobs is to disregard either the key or the value during certain stages of the job. For example, when writing the map function in “word count”, the map output sets the value of each word to 1, as a somewhat arbitrary placeholder.

Map

The Map phase runs some procedure on the data that transforms or filters the input data. Map iterates over the input key/value tuples, and generates a stream of output key/value tuples that are the result of applying the map function to the input data.

As an example, we may have a map function that takes a list of integers, and returns each integer that is above 10:

input: 23, 8, -2, 53, -7, 25, 35
output: 23, 52, 25, 35

Or, we could have a map function that selects words that start with the letter ‘c’:

input: 'cat', 'dog', 'mouse', 'chicken', 'wolf'
output: 'cat', 'chicken'

The above are examples of “filtering” maps. However, we can have maps that do transformation tasks as well. Consider this map function that takes a sentence as an input and yields the constituent words as its output:

input: 'the cat runs', 'the dog barks'
output: 'the', 'cat', 'runs', 'the', 'dog', 'barks'

From this example, we can see that a Map function can yield multiple outputs per call. In pseudocode, this may look like:

# Here, key is our sentence, and we disregard value
def map(key, value):
    for word in split_sentence_to_words(key):
        yield (word, 1)

Reduce

The Reduce phase collects keys and aggregates their values into some type of summary of the data. The standard example used to demonstrate this programming approach is a word count problem, where words (or tokens) are the keys and the number of occurrences of each word (or token) is the value.

The MapReduce framework structures the data in the reduce task such that each call to reduce is given a key, and an iterable (i.e. a list or iterator) of all the values associated with that key. Thus, the function signature of a reduce task looks like this:

def reduce(key, values):
    pass

In the computation stage between the map task and the reduce task, MapReduce performs a “shuffle and sort”. The “shuffle” is the process of transmitting map output data from mappers to reducers. Essentially, this process groups the map output tuples by keys, and carries all values for a given key to a single reducer, so that reducer has access to all data for the given key. Then, within each key set, the values are sorted. Thus, the iterator given to the reducer represents a sorted stream of values for that key. This property can be extremely useful for some workloads.

As the MapReduce technique was popularized by large web search companies like Google and Yahoo who were processing large quantities of unstructured text data, this approach quickly became popular for a wide range of problems. The standard MapReduce approach uses Hadoop, which was built using Java. However, to introduce you to this topic without adding the extra overhead of learning Hadoop’s idiosyncrasies, we will be simulating a map/reduce workload in pure Python using Yelp’s mrjob library.

Further Reading:

Example: Word Count

This example displays the type of programs we can build from simple map/reduce functions. Suppose our task is to come up with a count of the occurrences of each word in a large set of text. We could simply iterate through the text and count the words as we saw them, but this would be non-parallelizable. Instead we will use the MapReduce framework.

import re
from mrjob.job import MRJob

WORD_REGEX = re.compile(r"[\w']+")

class WordCount(MRJob):
    def mapper(self, _, val):
        for word in WORD_REGEX.findall(val):
            yield (word, 1)

    def reducer(self, key, vals):
        total_sum = 0

        # Iterate and count all occurrences of the word
        for _ in vals:
            total_sum += 1

        # Yield the word and number of occurrences
        yield key, total_sum

if __name__ == '__main__':
    WordCount.run()

This illustrates a simple MRJob class. We define a mapper function and a reducer function. We use the yield keyword to emit key/value tuples (the yield keyword is distinct from the return keyword, in that a function can continue executing after yield, whereas return terminates the function).

Run python word_count.py data/lorem_ipsum.txt to perform word count on the Lorem Ipsum text we’ve provided.

Later, we will run jobs like this on our course’s Hadoop cluster.

Problem 1: Bigram Count

Suppose now that instead of trying to count the individual words, we want to get counts of the occurrences word bigrams - that is, pairs of words that are adjacent to each other in the text (Bigrams are not just all the pairs of the words in the text).

For example, if our line of text was "cat dog sheep horse" then we’d have the bigrams (“cat”, “dog”), (“dog, “sheep”) and (“sheep”, “horse”).

Construct a map function and reduce function that will accomplish this goal. Output your bigrams using a comma as a separator. (i.e. the “cat dog” bigram becomes “cat,dog”)

Note: For the purposes of this exercise, we’ll only consider bigrams that occur on the same line. So, you don’t need to worry about pairs that occur between line breaks.

Example:

Input: (stdin)

a man a plan a canal panama
there was a plan to build a canal in panama
in panama a canal was built

Output: (stdout)

"a,canal"   3
"a,man" 1
"a,plan"    2
"build,a"   1
"canal,in"  1
"canal,panama"  1
"canal,was" 1
"in,panama" 2
"man,a" 1
"panama,a"  1
"plan,a"    1
"plan,to"   1
"there,was" 1
"to,build"  1
"was,a" 1
"was,built" 1

Note that the order of the output is not important in this exercise, however you must format your output keys so the match the above output.

Also, be sure to use the regex [\w']+ as defined in the word-count example when splitting text into words.

Testing your code

The following command will run your code and send the output to a file called problem1_out.txt:

python bigram_count.py data/lorem_ipsum.txt > problem1_out.txt

Problem 2: Count Inbound Wikipedia Article Links

In data/wikipedia_paths.txt, you will find a list of user-generated Wikipedia sessions. This data was derived from the SNAP Wikipedia dataset. Each line represents a user session, and shows the links that they clicked through to visit various articles.

For example:

Computer_programming;Linguistics;Culture;Popular_culture

Interpret this as: The user started on the Computer_programming article, then clicked a link to view Linguistics, then clicked a link to view Culture, and so on.

Note that users also that the option to “back click”, or go back in to a previous page. This is denoted by a “<” entry.

Thus, we would interpret the following session:

Sun;Comet;Comet_Halley;<;Comet_Hale-Bopp;Great_comet

… as the user going from Sun -> Comet -> Comet_Halley --Back-Track-> Comet -> Comet_Hale-Bopp -> Great_comet.

From this line, we infer the following links:

Sun -> Comet
Comet -> Comet_Halley
Comet -> Comet_Hale-Bopp
Comet_Hale-Bopp -> Great_comet

Note that a user can “back click” multiple times:

Animal;Rodent;Mouse;<;<;Feline;Cat;<;<;Canine;Dog

… yields:

Animal -> Rodent
Rodent -> Mouse
Animal -> Feline
Feline -> Cat
Animal -> Canine
Canine -> Dog

Your task is to infer the number of inbound links to each wikipedia article based on the traffic data we’ve provided. For the purposes of this problem, we assume that if a user clicks from page A to page B, then there exists a link on Wikipedia from page A -> B. For every page B, we want to find the number of unique pages A that link to B. (i.e. even if the dataset contains 100 sesions implying a link from Sun -> Comet, this should only increase Comet’s inbound link count by 1)

Output your results in the following format:

Nitpicky details:

Example:

Example Input:

animal;mammal;dog;olfaction
animal;mammal;cat;olfaction
animal;vertebrate;hagfish;<;mammal;dog;olfaction
animal;vertebrate;central_nervous_system;spinal_canal;vertebra;mammal;fur;fox;carnivora;<;<;<;porcupine;mammal

Example Output:

"carnivora" 1
"cat"   1
"central_nervous_system"    1
"dog"   1
"fox"   1
"fur"   1
"hagfish"   1
"mammal"    4
"olfaction" 2
"porcupine" 1
"spinal_canal"  1
"vertebra"  1
"vertebrate"    1

Testing your code

The following command will run your code and send the output to a file called problem2_out.txt:

python wikipedia_links.py data/wikipedia_paths.txt > problem2_out.txt

Problem 3: Mutual Twitter Followers

In this problem, you are given a list of follower Twitter follower relationships, and we want to determine pairs of users A and B such that A follows B and B follows A.

A row in the given dataset, a b, implies that user with id a follows the user with id b.

Example data:

1 2
1 3
1 4
2 1
2 3
3 6
4 1

Your task is to find all mutual follower relationships. However, to make this more interesting, we also want our output to only contain users that are followed by 10 or more other users.

To do this, you will need to use a multi-stage MapReduce job. You can read in-detail about how to define a multi-stage MapReduce job here.

In essence, you must override the .stage() method on your MRJob class so that it returns a list of MRStep objects, each of which defines an individual Map/Reduce task:

from mrjob.job import MRJob
from mrjob.step import MRStep

class ExampleMultiStage(MRJob):
    def mapper1(...):
        ...

    def reducer1(...):
        ...

    def mapper2(...):
        ...

    def reducer2(...):
        ...

    def steps(self):
        return [
            MRStep(mapper=self.mapper1, reducer=self.reducer1),
            MRStep(mapper=self.mapper2, reducer=self.reducer2)
        ]

Problem Description: Write a multistage MapReduce job that outputs all pairs a b such that a follows b and b follows a and a is followed by 10 or more users and b is followed by 10 or more users.

Output your results in the following format:

Example:

Example Input:

0 1
0 2
0 3
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
10 0
10 11
11 1
12 1
13 1
14 1
15 1
16 1
17 1
18 1
19 1

Example Output:

0   1

Testing your code

The following command will run your code and send the output to a file called problem3_out.txt:

python twitter_followers.py data/twitter_followers.txt > problem3_out.txt

Submission

MP 1 is due on Tuesday, January 30th, 2018 at 11:59PM.

You can find starter code and the Git repository where you should submit your code here:

If you have issues accessing your repository, make sure that you’ve signed into Gitlab. If you still do not have access, please make a private post on the course Piazza.

Please write your solutions as indicated in the following files:

… and commit your code to Gitlab like this:

git add .
git commit -m "Completed MP" # Your commit message doesn't need to match this
git push

WARNING: Our autograder runs on Python3. If you write code for this assignment that only works on Python2, we will be unable to grade your code. Code that contains syntax errors (i.e. unable to be interpreted by the Python interpreter) will receive no credit.