CS61C Spring 2011 Project 1: MapReduce in the Cloud


Intermediate milestone due:  02/6@23:59:59 PST
Due:  02/13@23:59:59 PST


Clarifications



Overview

For this project, you're finally going to roll up your sleeves and use MapReduce to answer a big data problem.


The question you're going to answer is this: given a word, what other words are statistically associated with it? If I say 'love', or 'death', or 'terrorism', what other words and concepts go with it?


In particular, the statistical definition we're going to use is the following: 

    Let A_w be the number of occurrences of w in the corpus.

    Let C_w be the number of occurrences of w in documents that also have the target word.


For the target word, produce an ordered list of words sorted by:

   

   Co-occurrence rate =  if(C_w > 0)   C_w * (log(C_w))3  / A_w 

                                    else  0



Your list should be ordered with the biggest co-occurrence rates at the top.


This isn't the most sophisticated text-analysis algorithm out there. But it's enough to illustrate what you can do with MapReduce.


The data will be the same Usenet corpus as in Lab 3 (and in turn, the same format as in Lab 2). Hence, the test data supplied for Lab 2 will also work here.


Do this project individually. You may not share code with other students or read other students' code. No exceptions.

     

Detailed Directions

We'll give you skeleton code for two MapReduce jobs and a Makefile in ~cs61c/proj1StartKit. You should copy that directory to your home directory (e.g. cp -R ~cs61c/proj1StartKit ./proj1), and then start modifying your copy of Proj1.java. Both jobs live in the same source file. That directory also includes  some test data: billOfRights.militia.testResult and billOfRights.jury.testResult which are the results of our reference implementation for the keywords "militia" and "jury", respectively. You should get the same output as we did.


We'll also give you a program (Importer.java) that you can use to convert text files to the appropriate sequence file format. This will let you create your own test data.


Your solution should implement the algorithm described above, with all code living in those two MapReduce jobs. Our reference solution defines no new classes or methods.  All the code lives inside map() and reduce(). You should not need to change any type signatures or edit anything in main.


Note that Hadoop guarantees that a reduce task receives its keys in sorted order. You may (should!) use this fact in your implementation.


It's possible that some Map or Reduce phases will be the identity function. This is the default behavior if you don't override the base class map() and reduce(). (aka if you just delete the map or reduce function.)


As you parse the text, you should convert it to lower case. Have a look at the Javadoc for java.lang.String here for a Java method that should be helpful to you. You may also need to use Java's HashMap and Math classes.


We've defined Combiner functions for both jobs. These are disabled by default; you can turn them on by specifying "-Dcombiner=true" on the command line. You should try to write the code to make use of these combiners. It'll make your job run faster, and we'll give you extra credit. It shouldn't be hard our combiner implementation was just a handful of lines. But you can still get full credit without them.


The framework expects you to output (from the second job) a DoubleWritable and Text pair. These should be the score for each word and the word itself, respectively. (You can output -1 * co-occurrence rate, if that makes things simpler for you.)


The whole corpus is inconveniently big, so we're just going to use the 2008 subset.


Running things locally

The way you should launch the jobs is via:


    hadoop jar proj1.jar Proj1 -DtargetWord=<target> <input> <intermediateDir> <outDir>


You should replace <target> with the target word for the analysis, and <intermediateDir> and <outDir> can be whatever paths you like.


The -D syntax sets a Hadoop configuration parameter. Inside your job, you can retrieve this parameter via context.getConfiguration().get("targetWord"). The framework we give you has this code already there for you.


The intermediate directory will hold the output from the first MapReduce job, which is used as input for the second job. This may be helpful in debugging.


The framework we give you supports two other options: Specifiying -Dcombiner=true will turn on the combiner. Specifiying -DrunJob2=false will cause only the first job to run, and will cause its output to be in a friendly (Text) format instead of as sequence files. This is intended for debugging.

    


Make sure you delete the intermediate and final outputs between runs.



STOP.  You have now done enough work for the intermediate checkpoint, due on the 6th. For the intermediate milestone, submit the file Proj1.java using


    submit proj1-1



Before submitting, you should verify that the invoke command above causes your code to run correctly. We'll be using that command, or one very similar to it, for our grading.


We're going to grade your code by running it in an automated test harness on sample data. So make sure the invoke script that we gave you correctly invokes your code. We expect you to give [approximately] the right numeric answers, barring floating point roundoff. So don't try to improve on the formula we give you to get "better" results.  We promise to avoid tests that are overly sensitive to roundoff.


Running things in the cloud


We expect you to test your code in the cloud. This should be a fun experience, not a chore. You're making machines hundreds of miles away jump to your bidding. And you don't even have to pay for it. Berkeley, however, does have to pay for it so please try to be cost-conscious.


For this assignment, we want you to run your job a total six times in the cloud.

    You'll be exclusively using "large" instances. For each of the three keywords "Terrorism", "France", and "Clinton", run using both 5 workers and then 9 workers.  Be sure to save the output and the job status pages (so you can determine input size and runtime) for each run.


We estimate that each run with 5 workers will take around 15-16 minutes (12 minutes with a combiner).


Do not leave EC2 machines running when you are not using them. The virtual machines cost the same amount of money when they are active as when they are sitting idle.



The EC2 usage instructions are substantially the same as those in Lab 3.

   If you haven't already setup your account for EC2 usage using:

        ec2-util --init

        new-ec2-certificate

        source ~/ec2-environment.sh


   To start a cluster with N workers, say:

        hadoop-ec2 launch-cluster --auto-shutdown=230 large N


   To redisplay the web interface URLs use:

        hadoop-ec2 list large


   You can then start the job on this cluster via: 

        hadoop-ec2 proxy large

        hc large jar proj1.jar Proj1 -DtargetWord=<target> s3n://cs61cUsenet <intermediateDir> <outDir>


    Remember to terminate your cluster when you're done.


For your intermediate directory, specify space on the cluster's distributed filesystem using hdfs:///NAME-YOU-CHOOSE . For your final output directory, specify space on S3 using s3n://cs61c/USERNAME/NAME-YOU-CHOOSE. (USERNAME should be your full course account username, starting with "cs61c-".) Later you can download the output files from S3 using

    s3cmd get -r s3://cs61c/USERNAME/NAME-YOU-CHOOSE

(We suggest using putting your outputs on S3 so you can retrieve them after you terminate your cluster. You can list the contents of your directory on S3 using s3cmd ls s3://cs61c/USERNAME/.)
[Note: the use of "s3://..." with s3cmd and "s3n://..." with Hadoop is deliberate and necessary.]

When you're done with a cluster, be sure to shut down your cluster via
        close_everything.sh
or
    hadoop-ec2 terminate-cluster large

Do not leave instances running when you are not using them. (You can always check whether you have instances running using 'hadoop-ec2 list' or 'ec2-my-instances'.)

For the final results, you should be submitting two files:  Your Proj1.java and the ec2experience.txt. Use the submission command

    submit proj1-2.


For the final submission, answer the following questions in a file named ec2experience.txt:

  1. On the big dataset, what were the top 20 words by relevance for each of: "Terrorism", "France", and "Clinton"
  2. How long did each run of program take on each number of instances?
  3. What was the median processing rate per GB (= 2^30 bytes) of input for the tests using 5 workers?  Using 9 workers?
  4. What was the speedup from 5 workers to 9 workers?   What do you conclude about how well Hadoop parallelizes your work?
  5. What was the price per GB processed? (Recall that an extra-large instance costs $0.68 per hour, rounded up to the nearest hour.)
  6. How many dollars in EC2 credits did you use to complete this project?
  7. Extra credit: did you use a combiner? What does it do, specifically, in your implementation?

    If you find bugs in your java code, you can fix them after the milestone and before the final submission.

When things go wrong

Stopping running Hadoop jobs

Often it is useful to kill a job. Stopping the Java program that launches the job is not enough; Hadoop on the cluster will continue running the job without it. The command to tell Hadoop to kill a job is:
    
    hc large job -kill JOBID

where JOBID is a string like "job_201101121010_0002" that you can get from the output of your console log or the web interface. You can also list jobs using 

      hc large job -list

Proxy problems

"12/34/56 12:34:56 INFO ipc.Client: Retrying connect to server: ec2-123-45-67-89.amazonaws....."

or

"Exception in thread "main" java.io.IOException: Call to ec2-123-45-67-89.compute-1.amazonaws.com/123.45.67.89:8020 failed on local exception: java.net.SocketException: Connection refused

<long stack trace>"


If you get this error from 'hc' try running

      hadoop-ec2 proxy large

again. If you continue getting this error from hc after doing that, check that your cluster is still running using
    
    hadoop-ec2 list large

and by making sure the web interface is accessible.

Deleting configuration files

If you've accidentally deleted one of the configuration files created by ec2-util --init, you can recreate it by rerunning ec2-util --init.

Last resort

It's okay to stop and restart a cluster if things are broken. But it wastes time and money.

Resources


All the code you need to write for this project should fit inside the map and reduce functions that we gave you in the skeleton. As a result, we're shielding you from the full complexity of the Hadoop APIs.

But if you need to dive deeper, there are a bunch of resources about MapReduce available on the web. It's okay [encouraged!] to use them. But be warned.  Hadoop changed its APIs in a backward in-compatible ways going from v19 to v21. The old API is under org.apache.hadoop.mapred. The new API is under org.apache.hadoop.mapreduce.  Version 20 is somewhere caught in the middle and the documentation is muddled in some places. You may need to tinker a bit to get things working.  With those caveats:

The Hadoop Javadoc for v20:  http://hadoop.apache.org/common/docs/r0.20.0/api/index.html
    [Note: The API part is machine-generated and basically correct. Some of the descriptive text for each class is stale/misleading. Watch out.]
The tutorial for v21:   http://hadoop.apache.org/mapreduce/docs/r0.21.0/mapred_tutorial.html


Some of you may be wondering why we're having you use tools that are this rough. The answer is, Hadoop is basically the only big-data tool out there. Yahoo!, Facebook, and Twitter all have bet their companies on using it. And, while Hadoop is pretty painful to use compared to polished consumer software, it's fairly representative of the just-good-enough-to-use tools that make modern Internet companies work. Welcome to computer science without the training wheels.    


Appendix: Miscellaneous EC2-related Commands

We don't think you'll need any of these...


Terminating/Listing Instances Manually
You can get a raw list of all virtual machines you have running using

    ec2-my-instances

This will include the public DNS name (starts with "ec2-" and ends with "amazonaws.com") and the private name (starts with "ip-...") of each virtual machine you are running, as well as whether it is running or shutting down or recently terminated, its type, the SSH key associated with it (probably USERNAME-default) and the instance ID, which looks like "i-abcdef12". You can use this instance ID to manually shut down an individual machine:

    ec2-terminate-instances i-abcdef12

Note that this command will not ask for confirmation. ec2-terminate-instances comes from the EC2 command line tools. ec2-my-instances is an alias for the command line tools' ec2-describe-instances command with options to only show your instances rather than the whole class's.

Listing/removing/moving files from S3
    
    s3cmd ls s3://cs61c/USERNAME/...
    s3cmd del s3://cs61c/USERNAME/...
    s3cmd mv s3://cs61c/USERNAME/... s3://... (OR to/from local file)

You can use glob patterns (e.g. s3://cs61c/USERNAME/out*) on S3 if you quote them. Note that you can delete/view other people's files from S3. Please do not abuse this.

Logging into your EC2 virtual machines

    hadoop-ec2 login large
    # or using a machine name listed by ec2-my-instances or hadoop-ec2 list
    ssh-nocheck -i ~/USERNAME-default.pem root@ec2-....amazonaws.com

The cluster you start is composed of ordinary Linux virtual machines. The file ~/USERNAME-default.pem is the private part of an SSH keypair for the machines you have started.

Viewing/changing your AWS credentials

You can view your AWS access key + secret access key using:
    ec2-util --list
If you somehow lose control of your AWS credentials, you can get new AWS access keys using
    ec2-util --rotate-secret 
    new-ec2-certificate
This is likely to break any running instances you have.