CS61A Lab 13: MapReduce

Spring 2013

To get the files necessary for this lab (such as mr.py), let's copy the relevant lab files:

~ # cd
~ # cp -r ~cs61a/public_html/sp13/labs/lab13/lab13 .
~ # cd lab13

For this lab, we'll be using MapReduce, a programming paradigm developed by Google, which allows a programmer to process large amounts of data in parallel on many computers. Hadoop is an opensource implementation of the MapReduce design.

Any computation in mapreduce consists primarily of two components: the mapper and the reducer.

The mapper takes an input file, and outputs a series of key-value pairs, like:

age 29
name cecilia
job gradstudent
salary 42
In the above, the key-value pairs are:
(age -> 29), (name -> cecilia), 
(job -> gradstudent), (salary -> 42) 
The reducer takes the (sorted) output from the mapper, and outputs a single value for each key. The mapper's output will be sorted according to the key.

The entire MapReduce pipeline can be summarized by the following diagram:

Mapreduce Diagram

The MapReduce Pipeline

Your job will be to write the mapper and the reducer.

Example: Line-counting with Shakespeare and Unix

Let's see an example of using the MapReduce idea to count the number of lines in Shakespear's works.
On our servers, we happen to have all of Shakespeare's plays in text format. For instance, if you're so inclined, feel free to read a few phrases from 'Romeo and Juliet':

 # emacs ~cs61a/lib/shakespeare/romeo_and_juliet.txt & 
Or how about...'The Tempest'?
 # emacs ~cs61a/lib/shakespeare/the_tempest.txt & 
Anyways, we'd like to be able to count all the lines in all of his plays. Choose a Shakespeare play (say, the_tempest.txt) and copy it into your current directory by doing:
 # cp ~cs61a/lib/shakespeare/the_tempest.txt . 

To formulate this as a MapReduce problem, we need to define an appropriate mapper and reducer function.
One way to do this is to have the mapper create a key-value pair for every line in each play, whose key is always the word 'line', and whose value is always 1.
The reducer would then simply be a simple sum of all the values, as this picture illustrates:

Linecount example

Line-counting in MapReduce

Let's implement each feature (mapper, reducer) separately, then see how each piece fits together.

The Mapper: line_count.py

In your current directory should be a file line_count.py with the following body:


#!/usr/bin/env python3

import sys
from ucb import main
from mr import emit

def run():
    for line in sys.stdin:
        emit('line', 1)

line_count.py is the mapper, which takes input from stdin (i.e. 'standard in') and outputs one key-value pair for each line to stdout (i.e. 'standard out', which is typically the terminal output).
Let's try running line_count.py by feeding it the_tempest.txt. The question is, how do we give the_tempest.txt to line_count.py via stdin? We'll use the Unix pipe '|' feature (Note: the 'pipe' key '|' isn't lowercase 'L', it's (typically) Shift+Backslash): Note: You will probably have to tell Unix to treat line_count.py as an executable by issuing the following command:

 # chmod +x line_count.py 
Once you've made line_count.py executable, type in the following command:
 # cat the_tempest.txt | ./line_count.py 
Recall that the cat program will display the contents of a given file to stdout.
If you've completed line_count.py correctly, your terminal output should be full of key-value pairs, looking something like:
'line' 1
'line' 1
'line' 1
'line' 1
What pipe-ing does in Unix is take the output of one program (in this case, the cat program), and 'pipe' it into the input to another program (typically via stdin). This technique of pipe-ing programs together is ubiquitous in Unix-style programming, and is a sign of modular programming. The idea is: if you can write modular programs, then it will be easy to accomplish tasks by chaining together multiple programs. We'll do more with this idea in a moment.

The Reducer: sum.py

In your current directory should be the file sum.py. The body of this file should be:


#!/usr/bin/env python3

import sys
from ucb import main
from mr import values_by_key, emit

def run():
    for key, value_iterator in values_by_key(sys.stdin):
        emit(key, sum(value_iterator))

This is the reducer, which reads in sorted key-value pairs from stdin, and outputs a single value for each key. In this case, sum.py will return the sum of all the values for a given key. In other words, the reducer is reducing the values of a given key into a single value.
The emit procedure takes in two arguments: a key, and a reduced_value, and performs the necessary bookkepping so that Hadoop is aware that we are combining all key-value pairs from the mapper (here, stdin) with the key key into the single value reduced_value.
For the purposes of this simple line-counter, since the mapper only returns one type of key ('line'), the reducer will also only return one value - basically the total number of key-value pairs.

Putting it all together

Now that we have the mapper and the reducer defined, let's put it all together in the (simplified) MapReduce framework:

map sort reduce

The MapReduce Flow

Note: You will probably have to tell Unix to treat sum.py as an executable by issuing the following command:

 # chmod +x sum.py 
Once you've done this, issue the following Unix command:
 # cat the_tempest.txt | ./line_count.py | sort | ./sum.py 
Notice that we're using the Unix program sort, which is a 'built-in' Unix program. As you'd expect, sort will, given a file, sort the lines of the file - by default, it will sort it alphabetically.
Take a moment and make sure you understand how the above Unix command is exactly the MapReduce framework (Map -> Sort -> Reduce). What's neat is that, in a very simple manner, we executed the MapReduce idea of using mappers and reducers to solve a problem. However, the main benefit of using the MapReduce idea is to take advantage of distributed computing - don't worry, we'll do that soon!


Question 1

Use the MapReduce framework (i.e. Map -> Sort -> Reduce) to count the number of times the following (common) words occur:

A question to ponder is: will you need to create a new mapper, a new reducer, or both?

MapReduce with Hadoop

We have provided a way to practice making calls to the MapReduce framework (using the Hadoop implementation). The provided file mr.py will take care of the details of communicating with Hadoop via Python. Here are a list of commands that you can give to mr.py:

Note: Some terminology. The Hadoop framework, for its own reasons, maintains its own filesystem separate from the filesystems your instructional accounts are on. As such, the following Hadoop filesystem commands are performed with respect to your Hadoop filesystem.

To use the distributed-computing power, you'll need to SSH into the icluster servers (Hadoop is installed only on these machines). To do this, issue the following terminal command:

# ssh -X icluster1.eecs.berkeley.edu
You will be prompted to log in.

Then, some environment variables need to be set - issue the following Unix command:

 source lab13_envvars 


 #  python3 mr.py cat OUTPUT_DIR 

This command prints out the contents of all files in one of the directories on the Hadoop FileSystem owned by you (given by OUTPUT_DIR).


 # python3 mr.py ls 

This command lists the contents of all output directories on the Hadoop FileSystem.


 # python3 mr.py rm OUTPUT_DIR 

This command will remove an output directory (and all files within it) on the Hadoop FileSystem. Use this with caution - remember, there's no 'undo'!



This command will run a MapReduce job of your choosing, where:

Example: Line-counting with Hadoop

Now, make sure that your line_count.py, sum.py, and mr.py are in the current directory, then issue the command:
# python3 mr.py run line_count.py sum.py ../shakespeare.txt mylinecount 
Your terminal should then be flooded with the busy output of Hadoop doing its thing. Once it's finished, you'll want to examine the Hadoop results! To do this, first call mr.py's ls command to see the contents of your Hadoop directory:
 # python3 mr.py ls 
You should see a directory listing for your mylinecount job. To view the results of this job, we'll use mr.py's cat command:
 # python3 mr.py cat mylinecount/part-00000 
As an interesting reference point, one TA ran this MapReduce job on a lightly-loaded icluster1, but totally in serial, meaning that each map job had to be done sequentially. The total line_count job took on the order of 5-8 minutes. How much faster was it to run it with Hadoop using distributed computing, where the work can be done in parallel?


Question 2:

Take your solution from Question 1 and run it through the distributed MapReduce (i.e. by using mr.py) to discover the number of occurrences of the following words in the entirety of Shakespeare's works:


Question 3a:

One common MapReduce application is a distributed word count. Given a large body of text, such as the works of Shakespeare, we want to find out which words are the most common.
Write a mapreduce program that returns each word in a body of text paired with the number of times it is used. For example, calling your solution with ../shakespeare should output something like:

the 300
was 249
thee 132

Note: These aren't the actual numbers.

You probably will need to write a mapper function. Will you have to write a new reducer function, or can you re-use a previously-used reducer?

Working with the Trends Project

We've included a portion of the trends project in the file that you copied at the beginning of the lab in the files "trends.py" and "data.py". We're going to calculate the total sentiment of each of Shakespear's plays much the same way that we calculated the total sentiment of a tweet in the trends project.

In order to do this, we need to create a new mapper. The skeleton for this new mapper is in the file sentiment_mapper.py. Fill in the function definition so that it emits the average sentiment of each line fed to it.

Note that we need to provide our code with the big sentiments.csv file. We've already stored this for you on the distributed file system that hadoop uses. To make sure the file is available to our code, we use the "run_with_cache" command instead of the "run" command which allows us to provide one additional parameter: the path (on the virtual file system) to the cache file which contains the sentiments. Don't worry too much about this part--it's just the specifics of our implementation.

long story short, we will use the following command to run this map reduce task:

    python3 mr.py run_with_cache sentiment_mapper.py sum.py ../shakespeare MY_OUTFILE ../sentiments.csv#sentiments.csv

More Fun Excercises!

Question 3b:

Now, we will determine the most commonly used word. Write a Python script file most_common_word.py that, given the output of the program you wrote in part 3A (via stdin), returns the most commonly used word. The usage should look like (assuming you named the Hadoop job output wordcounts):

# python3 mr.py cat wordcounts | python3 most_common_word.py 

Question 3c:

Now, write a Python script file that, given the mapreduce output from Q3A (via stdin), outputs all words used only once, in alphabetical order. Finally, output the results into a text file singles.txt. The Unix command should look like this:

# python3 mr.py cat wordcounts | python3 get_singles.py | sort > singles.txt 

Question 4a:

In this question, you will discover write a MapReduce program that, given a phrase, outputs which play the phrase came from.
Then, use your solution to figure out which play each of the following famous Shakespeare phrases came from:

pomp and circumstance                       foregone conclusion
full circle                                 strange bedfellows
neither rime nor reason                     spotless reputation
one fell swoop                              seen better days
it smells to heaven                         a sorry sight
Hint: In your mapper, you'll want to use the get_file() helper function, which is defined in the mr.py file. get_file() returns the name of the file that the mapper is currently processing - for ../shakespeare, the filenames will be play names. To import get_file, include the following line at the top of your Python script:
from mr import get_file
Also, you might want to look at the included set.py reducer which reduces the values of each key into a set (i.e. removing duplicates).