CS61C Spring 2014 Project 2 Part 2: Running MapReduce on Amazon EC2

TA: Shreyas Chand, Alex Chou, Jay Patel
Part 2: Due 11/02 @ 23:59:59

Updates

Clarifications/Reminders

Table of Contents

Summary

In this part of the project, we'll use the power of EC2 to strongly solve our Sliding puzzle. We'll be using 6 or 12 machines at a time, each with the following specifications:

High-CPU Extra Large Instance:
7 GiB of memory
20 EC2 Compute Units (8 virtual cores with 2.5 EC2 Compute Units each)
(according to Amazon, this is approximately equivalent to a machine
with 20 early-2006 1.7 GHz Intel Xeon Processors)
1.65 TiB of instance storage
64-bit platform
I/O Performance: High
API name: c1.xlarge

Thus, when we're working with 12 instances (machines), we're working under the following "constraints":

12 High-CPU Extra Large Instances:
Total of 84 GiB of memory
Total of 240 early-2006 1.7 GHz Intel Xeon Processors
Total of 19.8 TiB of instance storage

That's quite a bit of power!

Preparing Your Code

Before starting on part 2, there are a few updates to your code that you will want to implement before deploying anything to EC2. As the boards we will working with will be larger now and our RDDs will be larger in general, there are a few optimizations we should take care of before we start part 2. To copy the new skeleton over, enter the following command:

$ cp -r ~cs61c/proj/02/02 ~/proj2-2

Please be careful when copying over the skelton files! Don't accidentally overwrite all of your work.

Inside the skeleton code directory, you'll find a new copy of SlidingBfsSpark.py. Copy your code from part 1 into this file. You'll find two changes we've made: a global variable PARTITION_COUNT and an additional parameter to solve_puzzle called slaves. Both are described in more detail below.

Partitioning

If you were able to reach the performance goals in part 1 without using partitioning, we still recommend you implement it now. This is because with bigger boards there will be large numbers of data required to be shuffled, and partitioning minimize this. For more information, see the bottom of the part 1 spec. Python's built-in hash() function is a good starting point for thinking about a partitionFunc.

Once you've implemented partitioning (or already have it from part 1), you may find the PARTITION_COUNT variable useful. It is what we found to be the sweet spot for the number of partitions, and is based on the number of machines you are parallelizing on.

Hashing

This hashing optimization is ONLY intended for use in part 2. You will receive NO credit for using it in part 1, because it is not required to reach the performance benchmarks.

For the second part of the project, in order to increase the performance of our code some more, we will explore a way to shrink our overall RDD size as well as the amount of data shuffled before reducing. To do this, we have created a two-way hash function that maps a board layout to an integer and vice versa.

In the updated Sliding.py files, you will see two new functions: board_to_hash and hash_to_board. As the names suggest, these functions will hash a board layout to a number and from this number we can also determine the original board layout. You will not have to worry about the implementation of these functions, the important part is to understand that each board layout will only map to one number and vice versa. To get a better visualization of how this mapping works, take a look at the docstrings for board_to_hash and hash_to_board.

Now comes the important part: how can you make use of this hash function? As mentioned earlier, the advantage of this hash is that we can now shrink our original board to just an int and easily revert it back to the original board layout. As our code created key value pairs that made use of the board in part 1, we can apply this hash function onto our boards before writing it back as a key value pair in our map and reduce. Just make sure to convert your hash back to a board before you use it for any of the helper functions, like children!

You should not unhash your boards before outputting them to a text file. This will greatly increase the size of your output files.

Partitioning Output

Also, now that our boards will be larger than the boards we used in the previous parts, we wouldn't be making use of parallelism if we tried to funnel all of our key value pairs back into one partition and write it all out serially. To change this, we will instead use saveAsTextFile just like what we did in the lab. This way, each of the partitions will now be written into a separate file in the specified output directory. In order to implement this change, given your final RDD that you want to output, say nodes:

nodes.coalesce(NUM_WORKERS).saveAsTextFile(output) # Let NUM_WORKERS be the number of workers (6 or 12)

This will save your RDD as a folder named output in a shared file system (keep reading for instructions on how to fetch those results). Inside the directory there will be several text files one for each partition, labeled part-00**. You may find the slaves parameter useful in determining how many partitions to coalesce to.

Prepare Your Account

You MUST use one of the following servers for this project.

First, we need to take a few steps to prepare your account for launching jobs on EC2. Begin by running these commands (you should only need to do this once):

$ bash ~cs61c/ec2-init.sh 
$ source ~/ec2-environment.sh

In order to take advantage of the many machines we will be using, we have also made some modifications to the Makefile. As you run your code on different sized clusters you will have to update it to reflect the number of machines Spark can utilize. You will also have to update the line in the Makefile that tells Spark where our cluster is running. These lines that need to be updated appear at the very top in the form of variable assignment to MASTER and SLAVE_COUNT.

Yay! Everything should now be ready to go for EC2. Don't forget to keep updating the Makefile as you try different cluster sizes.

Head over to the next section to learn how to actually run things on EC2.

How to Run on EC2

Before beginning, we HIGHLY recommend you use a terminal multiplexer such as screen in case your ssh connection drops. Read more.

Here, we'll practice by running our code on EC2 for Sliding puzzle on a 5x2 board. Please read this entire section carefully before performing any of it. Also, please do not start a cluster for the sole purpose of running this example. Even if you run it only for 30 seconds and then shut down the instances, the University is charged for a full hour.

First, let's describe how your code will run on EC2. Through AWS, we'll be renting a cluster -- which is just a set of computers connected together on a fast local network. Spark runs on top of these machines and designates one as a master and the rest as slaves The master runs the driver program which oversees all operations and allocates work to slaves (aka worker nodes). Slaves are used to perform all parallelized operations, such as map and reduce.

Now, we'll want to go ahead and launch our cluster. To launch a large cluster with N slaves, we run the following command (from a lab machine). Note that this may take a few minutes to complete.

$ spark-ec2 launch --slaves N --instance-type=c1.xlarge $USER

This step will take a few minutes, so don't worry if things seem to hang for a bit. However, if you do run into issues check the troubleshooting section for some tips.

The next step is to copy our code to the master and log in to it. First, run the following command to find out the hostname for the master:

$ spark-ec2 get-master $USER # Returns the hostname of the master. 

You should set the MASTER variable in the Makefile to the URL returned by the above command. You will also need to use it in the following commands that show you how to copy your code to the EC2 master server. Wherever you see MASTER in the commands, use the URL instead. Before you go ahead with the copy, make sure you have updated the Makefile with both the correct MASTER url and the correct SLAVE_COUNT for the current cluster.

$ scp -i ~/$USER-default.pem -r proj2-2 root@MASTER: # Copies your code to the MASTER(the URL from the last step) of your cluster
$ spark-ec2 login $USER # Logs you into the MASTER of your cluster
[EC2-MASTER]$ cd proj2-2 # Change directory to where your code was scp'ed

Now, we're ready to run our job!

[EC2-MASTER]$ make ec2-medium

Your job will begin. In the first few log messages, you will see a Spark Web UI URL such as follows.

INFO SparkUI: Started SparkUI at http://ec2-54-226-179-2.compute-1.amazonaws.com:8080/

Take note of this, because you can paste it into your browser, you'll be able to view job status and all kinds of cool stats in your browser, as long as the Spark JOB is still running. You may need some of this information to answer the required questions later (so check out the questions before starting).

UPDATED: 10/28/2014 11:59:00 PM

If everything ran correctly, your output will have been placed in the shared file system. To view the output files use the following command:

# List all the generated part files.
$  ~/ephemeral-hdfs/bin/hadoop fs -ls hdfs:///user/root/medium-puzzle-out/ # Or use large-puzzle-out for the 4x3 puzzle
Warning: $HADOOP_HOME is deprecated.

Found 3 items
-rw-r--r--   3 root supergroup          0 2014-10-29 04:57 /user/root/medium-puzzle-out/_SUCCESS
-rw-r--r--   3 root supergroup   13096372 2014-10-29 04:57 /user/root/medium-puzzle-out/part-00000
-rw-r--r--   3 root supergroup   11749378 2014-10-29 04:57 /user/root/medium-puzzle-out/part-00001
... # more lines depending on number of slaves

You can use the above output to calculate the total filesize. The fifth column lists the number of bytes in each file. Add up all of them and convert them into an IEC prefix format for easy computation!

To fetch the output files use the following commands:

$  ~/ephemeral-hdfs/bin/hadoop fs -cat hdfs:///user/root/medium-puzzle-out/part-00000 >> medium-puzzle-out
$  ~/ephemeral-hdfs/bin/hadoop fs -cat hdfs:///user/root/medium-puzzle-out/part-00001 >> medium-puzzle-out
... # Run these for each part file in the output directory (and don't forget to use the correct name depending on the puzzle size!)

For the large puzzle size those commands will not be very feasible since the output file size will be in the gigabytes. Specifically, do NOT copy the output from running your code on the 4 by 3 puzzle size. In that case, you should just use the ls command from above to calculate the total file size.

If you need to run another mapreduce job after one concludes, you need to clean the output directory on the Amazon servers. Before you do clean the output directory make sure you retrieve all the data you need! Once you are sure you are done, run one of the following:

$ make clean-medium
$ make clean-large

Finally, once we're all done grabbing all the information/data we need, and have run all the jobs that are needed, we go ahead and shutdown the cluster. This destroys all data that we placed on the machines in the cluster. In addition, the URLs you used earlier to see the web GUI will cease to function. Thus, be sure that you grab all the data you need to finish your project before terminating. In order to terminate, run the following from the instructional machine:

$ spark-ec2 destroy $USER 

Please note that if you leave an instance running for an unreasonable amount of time (for example, overnight), you will lose points. We track the usage of each account and you will need to give us a cost assessment in your final report.

Assignment

Part 2 (due 11/02/2014 @ 23:59:59)

Before running your code on EC2, run it locally to be sure it is correct and reasonably efficient (you should have done this for part 1 anyway). If you are still having trouble, please talk us on Piazza in a private post before running on EC2.

Complete the following questions (and do the required runs on EC2) and place the answers in a plain-text file called proj2-2.txt. The contents of this file are what will be graded for this part of the project. Please read through all of the questions before starting to run anything on EC2, so that you know exactly what information you need to collect before terminating a cluster.

  1. Run your code on a Sliding Puzzle of size 5x2 on clusters with 6 slaves and 12 slaves and on a Sliding Puzzle of size 4x3 on clusters with 12 slaves. How long does each take?
  2. What was the mean processing rate (in MB/s) of your code for 6 and 12 instances? You can approximate the total data size to be (output size of your file)
  3. What was the speedup for 12 instances relative to 6 instances for the 5x2 board? What do you conclude about how well Spark parallelizes your work? Is this a case of strong scaling or weak scaling? Why or why not?
  4. What was the price per GB processed for each cluster size? (Recall that an extra-large instance costs $0.68 per hour, rounded up to the nearest hour.)
  5. How many dollars in EC2 credits did you use to complete this project?

Submission

The full proj2-2 is due Sunday (11/02/2014). To submit proj2-2, enter in the following. You should be turning in proj2-2.txt.

$ submit proj2-2

Grading

Part 1 is worth 2/3 of your Project 2 grade.

Part 2 is worth 1/3 of your Project 2 grade.

Troubleshooting

If you are... and you have this problem... then try this...
Trying to run any spark-ec2 command /home/ff/cs61c/bin/spark-ec2: ~cs61c/spark-1.1.0/ec2/spark-ec2: not found Are you on a Linux machine?
Launching a cluster The requested Availability Zone is currently constrained and we are no longer accepting new customer requests for t1/m1/c1/m2/m3 instance types. Please retry your request by not specifying an Availability Zone or choosing us-east-1d, us-east-1b, us-east-1a. Use the --zone flag with one of the specified zones.
Launching a cluster ssh: connect to host ec2-54-226-179-2.compute-1.amazonaws.com port 22: Connection refused
Error executing remote command, retrying after 30 seconds
Wait for the script to retry connecting. If this fails repeatedly, then run spark-ec2 launch --resume $USER

Acknowledgements

Thanks to Scott Beamer (a former TA), the original creator of this project, as well as Alan Christopher, Ravi Punj, and Sung Roa Yoon.



Screen - terminal multiplexer

Programs such as screen and tmux are terminal multiplexors. They enable two main features: (1) to have multiple terminal windows within one, (2) to be able to reconnect to an ssh connection if your network connection drops. Here we'll focus on (2).

You will likely find screen to be a very valuable tool even beyond proj2-2!

screen works by creating sessions. So let's first begin by logging into a lab server, and creating a new session called foo

$ screen -S foo

You'll notice you are now in a new blank shell prompt. From here you can run any commands you'd like.

You can explicitly detach from your session with the keyboard shortcut: (Ctrl+A) D. Or you might find yourself accidentally detached if your network connection dies. When you are detached, any processes in the session still continue to run (which isn't true with a normal ssh connection).

In either case, to reconnect to the session, you can ssh onto the same server and reconnect with:

$ screen -rd foo

If you forget the name of your session or want to see all of your open sessions, use screen -ls.

To see all of screens available features, run screen -h.

Sanity check exercise: can you and your partner create a new session called calendar that runs the command cal? Simulate your network dropping, by X-ing out of the ssh connection. Can you reconnect to the calendar session? How do you delete a session you are done with? (check screen -h)