CS61C Fall 2017 Lab 11: MapReduce and Spark

Goals

Setup

Copy the lab files from the instructional servers to your lab account with

$ cp -r ~cs61c/labs/11/ ~/labs/11/

You will also be working with Spark (in Python!), so you may need to brush up a bit on your Python!

IMPORTANT: to be able to run Spark, you must create a virtual environment using the correct version of Python. This can be done as such:

$ conda create --name lab11env python=2.7

Respond to the prompt to install packages with "y" (with no quotes). You might have to wait for a bit for these to install. Finally, run the following command to activate the virtual environment:

$ source activate lab11env 

This will put you in a virtual environment needed for this lab. Please remember that if you exit the virtual environment and want to return to work on the lab, you must re-run "source activate lab11env" first for Spark to work

Background Information

In lecture we've exposed you to cluster computing (in particular, the MapReduce framework), how it is set up and executed, but now it's time to get some hands-on experience running programs with a cluster computing framework!

In this lab, we will be introuducing you to a cluster computing framework called Spark. Spark was developed right here at Berkeley before being donated to the Apache Software Foundation in 2013. We will be writing Python code to run in Spark to give us some practice in writing Map and Reduce routines.

Spark has it's own website, so you are free to try to install it onto your local machines, although it may be easier to ssh into the lab computers to complete this lab.

Avoid Global Variables

When using Spark, avoid using global variables! This defeats the purpose of having multiple tasks running in parallel and creates a bottleneck when multiple tasks try to access the same global variable. As a result, most algorithms will be implemented without the use of global variables.

Documentation, and Additional Resources

Exercises

Note: Different exercises may be solvable or needed to be solved by reconsidering how map(), flat_map() and reduce() are implemented and called and in which order, so keep this in mind when calling whichever you must use

The following exercises use three different sample input files, two of which are provided by the staff and can be found in ~cs61c/data:

  1. billOfRights.txt.seq -- the 10 Amendments split into separate documents (a very small input)
  2. complete-works-mark-twain.txt.seq -- The Complete Works of Mark Twain (a medium-sized input)

Notice the .seq extension, which signifies a sequence file that is readable by Spark. These are NOT human-readable. Spark supports other input formats, but you will not need to worry about that for this lab. To get a sense of the texts you'll be using, simply drop the .seq portion to view the text file (i.e. ~cs61c/data/billOfRights.txt).

Although an exercise may not explicitly ask you to use it, we recommend testing your code on the billOfRights data set first in order to verify correct behavior and help you debug.

We recommend deleting output directories when you have completed the lab, so you don't run out of your 500MB of disk quota. You can do this by running:

$ make destroy-all

Please be careful with this command as it will delete all outputs generated in this lab.

Exercise 0: Generating an Input File for Spark

For this exercise you will need the Makefile and Importer.java. In this lab, we'll be working heavily with textual data. We have some pre-generated datasets as indicated above, but it's always more fun to use a dataset that you find interesting. This section of the lab will walk you through generating your own dataset using works from Project Gutenberg (a database of public-domain literary works).

Step 1: Head over to Project Gutenberg, pick a work of your choosing, and download the "Plain Text UTF-8" version into your lab directory.

Step 2: Open up the file you downloaded in your favorite text editor and insert "---END.OF.DOCUMENT---" (without the quotes) by itself on a new line wherever you want Spark to split the input file into separate (key, value) pairs. The importer we're using will assign an arbitrary key (like "doc_xyz") and the value will be the contents of our input file between two "---END.OF.DOCUMENT---" markers. You'll want to break the work into reasonably-sized chunks, but don't spend too much time on this part (chapters/sections within a single work or individual works in a body of works are good splitting points).

Step 3: Now, we're going to run our Importer to generate a .seq file that we can pass into the Spark programs we'll write. The importer is actually a MapReduce program, written using Hadoop! You can take a look at Importer.java if you want, but the implementation details aren't important for this part of the lab. You can generate your input file like so:

$ make generate-input myinput=YOUR_FILE_FROM_STEP_2.txt

Your generated .seq file can now be found in the convertedOut directory in your lab11 directory. Throughout the rest of this lab, you'll be able to run the mapreduce programs we write using make commands. The make commands will be of the form make PROGRAMNAME-INPUTSIZE. If you wish to try out the input file you generated here, you can instead run:

$ make PROGRAMNAME myinput=YOUR_SEQ_FILE_FROM_STEP_3.txt.seq # Output in wc-out-PROGRAMNAME/ directory

Exercise 1: Running Word Count

For this exercise you will need the Makefile and already-completed wordcount.py. You can run it on our desired input using a convenient make command:

$ make sparkwc-small

This will run wordcount.py over billOfRights.txt.seq. Your output should be visible in wc-out-wordcount-small/part-00000.

Next, try your code on the larger input file complete-works-mark-twain.txt.seq. One interesting feature of Spark is that it is an in-memory distributed computing engine, so it has no default file storage. Instead, we use Hadoop Distributed File System (HDFS) to help store our output. You are welcome to read more about Hadoop on your own. In general, Hadoop requires that the output directory not exist when a MapReduce job is executed, however our Makefile takes care of this by removing our old output directory.

$ make sparkwc-medium

Your output for this command will be located in the spark-wc-out-wordcount-medium directory. Search through the file for a word like "the" to get a better understanding of the output.

Exercise 2: Document Word Count

Open docwordcount.py. Notice that it currently contains the same code as wordcount.py, which you just tried for yourself. Modify it to count the number of documents containing each word rather than the number of times each word occurs in the input.

To help you with understanding the code, we have added some comments, but feel free to check out transformations and actions on the Spark website for a more detailed explanation on some of the methods that can be used in Spark.

In this part, you may find it useful to look at the transformations and actions link provided above, as there are methods that you can use to help sort an output or remove duplicate items. To help with distinguishing when a word appears in a document, you may want to make use of the document ID as well -- this is mentioned in the comments of flat_map.

Finally, make sure the output is sorted in alphabetical order. (Hint: Is there another transformation you can use?)

You can test docwordcount.py using either of the following (for our two data sets):

$ make sparkdwc-small  # Output in spark-wc-out-docwordcount-small/

OR

$ make sparkdwc-medium # Output in spark-wc-out-docwordcount-medium/

Check-off

Exercise 3: Full Text Index Creation

Open index.py. Notice that the code is similar to docwordcount.py. Modify it to output every word and a list of locations (document identifier followed by the word index of EACH time that word appears in that document). Make sure your word indices start at zero. Your output should have lines that look like the following (minor line formatting details don't matter):

(word1	document1-id, word# word# ...)
(word1	document2-id, word# word# ...)
. . .
(word2	document1-id, word# word# ...)
(word2	document3-id, word# word# ...)
. . .

Notice that there will be a line of output for EACH document in which that word appears and EACH word and document pair should only have ONE list of indices. Remember that you need to also keep track of the document ID as well.

For this exercise, you may not need all the functions we have provided. If a function is not used, feel free to remove the method that is trying to call it. Make sure your output for this is sorted as well (just like in the previous exercise.

You can test index by using either of the following commands (for our two data sets):

$ make index-small # Output in spark-wc-out-index-small/

OR

$ make index-medium # Output in spark-wc-out-index-medium/

The output from running make index-medium will be a large file. In order to more easily look at its contents, you can use the commands cat, head, more, and grep:

$ head -25 OUTPUTFILE       # view the first 25 lines of output
$ cat OUTPUTFILE | more     # scroll through output one screen at a time (use Space)
$ cat OUTPUTFILE | grep the # output only lines containing 'the' (case-sensitive)

Make sure to verify your output. Open complete-works-mark-twain.txt and pick a few words. Manually count a few of their word indices and make sure they all appear in your output file.

Check-off

  1. Explain your code in index.py to your TA.
  2. Show your TA the first page of your output for the word "Mark" in complete-works-mark-twain.txt.seq to verify correct output. You can do this by running: cat wc-out-index-medium/part-00000 | grep Mark | less