Project 2: Building a Full Inverted Index for an Online E-Book Store

This work is Copyright © 2007 University of California, Berkeley, and is based on work Copyright © 2007 University of Washington and licensed under the Creative Commons Attribution 3.0 License -- http://creativecommons.org/licenses/by/3.0/

Overview:

In this assignment, your job is to design and implement an important component of an online electronic-book store - the capability for customers of the book store to search for e-books by full-text content. The component will run on the "Cloud Computing Cluster" (aka "Google/IBM Cluster").

The Cloud Computing Cluster is a remote facility managed by Google and IBM.  It consists of  40 64-bit dual-core AMD Opterons (80 cpus) running Redhat SELinux in a Xen virtual machine.  Google and IBM have donated access time to EECS students.  Google and IBM provide EECS with login "tokens" that we give to students in selected classes. For this assignment, you will create an account on the cluster and use either "ssh" (or "putty") or a special Eclipse plugin to run
Hadoop MapReduce jobs on the cluster. 

This assignment  involves creating an full inverted index of the contents of all the books. A full inverted index is a mapping of words to their location (document file and offset) in a set of documents. Most modern search engines utilize some form of a full inverted index to process user-submitted queries. In its most basic form, an inverted index is a simple hash table which maps words in the documents to some sort of document identifier, while a full inverted index maps words in the documents to some sort of document indentifier and offset within the document. For example, if given the following two documents:

    Doc1 (if you don’t know the story, search for this on Wikipedia):
    Buffalo buffalo Buffalo buffalo 
    buffalo buffalo Buffalo buffalo.

    Doc2:
    Buffalo are
    mammals.


You could construct the following inverted file index:

    Buffalo -> (Doc1,1), (Doc1, 2), (Doc2, 1)
    buffalo -> (Doc1, 1), (Doc1, 2)
    buffalo. -> (Doc1, 1)
    are ->  (Doc2, 1)
    mammals. -> (Doc2, 2)

To create the full inverted index in a scalable manner, you will use Hadoop (http://lucene.apache.org/hadoop/), an open-source implementation of Google's MapReduce (http://labs.google.com/papers/mapreduce.html) programming paradigm to manage the distributed processing over multiple computers (nodes).

Assignment Outline:

You  will start with an exercise to introduce you to Hadoop and how you use it in applications. Each student in the project team must perform the exercises in items 2 through 5 of the assignment outline (see below). Your design submission should include the design for item 6 of the assignment outline, a description of each project team member’s project environment, AND the output from  the Line Indexer example

  1. Setting up your environment 
  2. Hadoop concepts
  3. The Line Indexer Example
  4.  Running a Map-Reduce on Hadoop
  5.  Building the Inverted Book Index
 References:
  http://inst.eecs.berkeley.edu/cgi-bin/pub.cgi?file=mapreduce.help (Read this document first)
 
  http://code.google.com/edu/parallel/mapreduce-tutorial.html
  http://www.alphaworks.ibm.com/tech/mapreducetools/
  http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python
  http://www.michael-noll.com/wiki/Running_Hadoop_On_Ubuntu_Linux_%28Single-Node_Cluster%29
  http://www.michael-noll.com/wiki/Running_Hadoop_On_Ubuntu_Linux_%28Multi-Node_Cluster%29
  Googling for "hadoop run map-reduce" will yield lots of programming examples.

Setting up your environment:

You will need to set up some software for this project, including VMware Player (or VMware Workstation or Fusion), the Hadoop image, logging into the Google/IBM cluster, and the Eclipse plug-in for Hadoop.

Using VMware Player and the Hadoop Image --

The following VMware image contains a preconfigured single node instance of Hadoop that provides the same interface as a full cluster without any of the overhead. To use the pre-built Hadoop image, you first need to set up the free VMware Player (or alternatively, download either VMware Workstation or Fusion and request a license key from cs162@cory). You can download VMware Player for Linux or Windows from here (http://www.vmware.com/download/player/download.html).

Once you have VMware player installed, download the local VMware image (110.1 MB) from here (original: http://dl.google.com/edutools/hadoop-vmware.zip). Or download an updated VM image with Hadoop 0.16.3 here. The image is packaged as a directory archive. To begin set up unzip the image in the directory of your choice (you need at least 10GB, and the disk image can grow to 20GB). The VMware image package contains:

The system image is based on Ubuntu (version 7.04) and contains a Java machine (Sun JRE 6 - DLJ License v1.1) and the Hadoop 0.13.0 distribution.

To start the VMware Virtual Machine, go to the directory where the packaged files are unzipped, and run:

A new window will appear which will print a message indicating the IP address allocated to the guest OS. This is the IP address you will use to submit jobs from the command line or the Eclipse environment. The guest OS contains a running Hadoop infrastructure which is configured with:

The guest OS can be reached from the provided console or via SSH using the IP address indicated above. Log into the guest OS with:

Once the image is loaded, you can log in with the guest account. Hadoop will be installed in the guest home directory(/home/guest/hadoop). Three scripts are provided for Hadoop maintenance purposes:

The Hadoop configuration can be edited by modifying the files in /home/guest/hadoop-conf/ For further information on this go to Hadoop Wiki .

To stop the Virtual Machine, log in as administrator and issue:


Running Jobs from the VM Command Line --

To run MapReduce programs from the command line, log into the guest OS, and use the Hadoop Command line tool to manipulate HDFS files and MapReduce jobs. A set of simple MapReduce programs are included with the Hadoop Distribution in hadoop-examples.jar. For example, to run the MapReduce approximation of pi included with the example files with four map tasks, each computing ten thousand samples, issue:

For more information see the Examples section on the Hadoop Wiki (http://wiki.apache.org/lucene-hadoop/). Alternatively Hadoop jobs can be run directly from Eclipse (see below).


Running A Map-Reduction on the Google/IBM Cloud Computing Cluster --

    To run a Hadoop job, simply ssh into cluster's gateway node by following the instructions here: http://inst.eecs.berkeley.edu/cgi-bin/pub.cgi?file=mapreduce.help#5.

Using the Eclipse Plug-in with the VM --

The
IBM MapReduce Tools for Eclipse Plug-in is a robust plug-in that brings Hadoop support to the Eclipse platform, including support for server configuration, launching MapReduce jobs and browsing the distributed file system. This setup assumes that you are running Eclipse (version 3.3 or above) on your computer (you can find instructions for setting up Eclipse here). Instructions for using Subversion instead of CVS can be found here

Once you have the plug-in working with Eclipse, you can add a new Hadoop server by:


Using the Eclipse Plug-in with the Google/IBM Cloud Computing Cluster --

To use the Eclipse plug-in with the Google/IBM Cloud Computing Cluster, follow the direction here: http://inst.eecs.berkeley.edu/cgi-bin/pub.cgi?file=mapreduce.help#6

Hadoop Concepts:

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model.

Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. The run-time system takes care of the details of partitioning the input data, scheduling the program's execution across a set of machines, handling machine failures, and managing the required inter-machine communication. This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system.  Google's implementation of MapReduce runs on a large cluster of commodity machines and is highly scalable: a typical MapReduce computation processes many terabytes of data on thousands of machines. Google programmers find the system easy to use: hundreds of MapReduce programs have been implemented and upwards of one thousand MapReduce jobs are executed on Google's clusters every day.

Hadoop is a Free Java software framework that supports distributed applications running on large clusters of commodity computers that process huge amounts of data. It is an Apache Lucene sub-project and was originally developed to support distribution for the Nutch web crawler. Hadoop consists of a distributed filesystem reminiscent of GoogleFS named the "Hadoop Distributed File System" (HDFS) and a MapReduce implementation. Hadoop was named after its creator, Doug Cutting's child's stuffed elephant.
To use Hadoop, you write two classes: a Mapper and a Reducer. The Mapper class contains a map function, which is called once for each input and outputs any number of intermediate <key, value> pairs. What code you put in the map function depends on the problem you are trying to solve. Let's start with a short example. Suppose the goal is to create an "index" of a body of text -- we are given a text file, and we want to output a list of words annotated with the line-number at which each word appears. For that problem, an appropriate Map strategy is: for each word in the input, output the pair <word, line-number> For example, suppose we have this five-line high school football coach quote as our input data set:

We are not what
we want to be,
but at least
we are not what
we used to be.

Running the Map code that for each word, outputs a pair <word, line-number>, yielding the set of pairs...

<we, 1>
<are, 1>
<not, 1>
<what, 1>
<we, 2>
<want, 2>
<to, 2>
<be, 2>
<but, 3>
etc...

For now we can think of the <key, value> pairs as a nice linear list, but in reality, the Hadoop process runs in parallel on many machines. Each process has a little part of the overall Map input (called a map shard), and maintains its own local cache of the Map output. For a description of how it really works, you should read the Hadoop Wiki (
http://wiki.apache.org/lucene-hadoop/) and the Google paper: "MapReduce: Simplified Data Processing on Large Clusters," by Jeffrey Dean and Sanjay Ghemawat, OSDI'04: Sixth Symposium on Operating System Design and Implementation, San Francisco, CA, December, 2004 (Their slide presentation can be found herehttp://labs.google.com/papers/mapreduce.html). After the Map phase produces the intermediate <key, value> pairs they are efficiently and automatically grouped by key by the Hadoop system in preparation for the Reduce phase (this grouping is known as the Shuffle phase of a map-reduce). For the above example, that means all the "we" pairs are grouped together, all the "are" pairs are grouped together like this, showing each group as a line...

<we, 1> <we, 2> <we, 4> <we, 5>
<are, 1> <are, 4>
<not, 1> <not, 4>
<what, 1> <what, 4>
<want, 2>
<to, 2> <to, 5>
<be, 2> <be 5>
<but, 3>
<at, 3>
<least, 3>
<used, 5>

The Reducer class contains a reduce function, which is then called once for each key -- one reduce call for "we", one for "are", and so on. Each reduce looks at all the values for that key and outputs a "summary" value for that key in the final output. So in the above example, the reduce is called once for the "we" key, and passed the values the mapper output, 1, 4, 2, and 5 (the values going into reduce are not in any particular order). Suppose reduce computes a summary value string made of the line numbers sorted into increasing order, then the output of the Reduce phase on the above pairs will produce the pairs shown below. The Reduce phase also sorts the output <key,value> pairs into increasing order by key:

<are, 1 4>
<at, 3>
<be, 2 5>
<but, 3>
<least, 3>
<not, 1 4>
<to, 2 5>
<we, 1 2 4 5>
<what, 1 4>
<want, 2>
<used, 5>

Like Map, Reduce is also run in parallel on a group of machines. Each machine is assigned a subset of the keys to work on (known as a reduce shard), and outputs its results into a separate file.


Line Indexer Example --

Consider a simple "line indexer" (all of the example code here is available to be checked out, built, and run). Given an input text, offset indexer uses Hadoop to produce an index of all the words in the text. For each word, the index has a list of all the locations where the word appears and a text excerpt of each line where the word appears. Running the line indexer on the complete works of Shakespeare yields the following entry for the word "cipher".

38624 To cipher what is writ in learned books,
12046 To cipher me how fondly I did dote;
34739 Mine were the very cipher of a function,
16844 MOTH To prove you a cipher.
66001 ORLANDO Which I take to be either a fool or a cipher.

The Hadoop code below for the line indexer is actually pretty short. The Map code extracts one word at a time from the input, and the Reduce code combines all the data for one word.


Line Indexer Map --

A Java Mapper class is defined in terms of its input and intermediate <key, value> pairs. To declare one, simply subclass from MapReduceBase and implement the Mapper interface. The Mapper interface provides a single method: public void map(WriteableComparable key, Writeable value, OutputCollector output, Reporter reporter). Note: these inner classes probably need to be declared "static". If you get an error saying ClassName.<init>() is not defined, try declaring your class static. The map function takes four parameters which in this example correspond to:

  1. WriteableComparable key - the byte-offset
  2. Writeable value - the line from the file
  3. OutputCollector - output - this has the .collect method to output a <key, value> pair
  4. Reporter reporter - you can ignore this for now

    The Hadoop system divides the (large) input data set into logical "records" and then calls map() once for each record. How much data constitutes a record depends on the input data type; For text files, a record is a single line of text. The main method is responsible for setting output key and value types.

    Since in this example we want to output <word, offset> pairs, the types will both be Text (a basic string wrapper, with UTF8 support). It is necessary to wrap the more basic types because all input and output types for Hadoop must implement WritableComparable, which handles the writing and reading from disk.


Line Indexer Map --

For the line indexer problem, the map code takes in a line of text and for each word in the line outputs a string key/value pair <word, offset:line>. The Map code below accomplishes that by
parsing each word out of value. For the parsing, the code delegates to a utility StringTokenizer object that implements hasMoreTokens() and nextToken() to iterate through the tokens.

  1. For each word, getting the location from key.
  2. Calling output.collect(word, value) to output a <key, value> pair for each word.


public static class LineIndexerMapper extends MapReduceBase implements Mapper
{
    private final static Text word = new Text();
    private final static Text summary = new Text();
    public void map(WritableComparable key, Writable val, OutputCollector output, Reporter reporter)
        throws IOException {
        String line = val.toString();
        summary.set(key.toString() + ":" + line);
        StringTokenizer itr = new StringTokenizer(line.toLowerCase());
        while(itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            output.collect(word, summary);
        }
    }
}

    When run on many machines, each mapper gets part of the input -- so for example with 100 Gigabytes of data on 200 mappers, each mapper would get roughly its own 500 Megabytes of data to go through. On a single mapper, map() is called going through the data in its natural order, from start to finish. The Map phase outputs <key, value> pairs, but what data makes up the key and value is totally up to the Mapper code. In this case, the Mapper uses each word as a key, so the reduction below ends up with pairs grouped by word. We could instead have chosen to use the line-length as the key, in which case the data in the reduce phase would have been grouped by line length. In fact, the map() code is not required to call output.collect() at all. It may have its own logic to prune out data simply by omitting collect. Pruning things in the Mapper is efficient, since it is highly parallel, and already has the data in memory. By shrinking its output, we shrink the expense of organizing and moving the data in preparation for the Reduce phase.

Line Indexer Reduce --
    Defining a Reducer is just as easy. Simply subclass MapReduceBase and implement the Reducer interface: public void reduce(WriteableComparable key, Iterator values, OutputCollector output, Reporter reporter). The reduce() method is called once for each key; the values parameter contains all of the values for that key. The Reduce code looks at all the values and then outputs a single "summary" value. Given all the values for the key, the Reduce code typically iterates over all the values and either concats the values together in some way to make a large summary object, or combines and reduces the values in some way to yield a short summary value.

The reduce() method produces its final value in the same manner as map() did, by calling output.collect(key, summary). In this way, the Reduce specifies the final output value for the (possibly new) key. It is important to note that when running over text files, the input key is the byte-offset within the file. If the key is propagated to the output, even for an identity map/reduce, the file will be filed with the offset values. Not only does this use up a lot of space, but successive operations on this file will have to eliminate them. For text files, make sure you don't output the key unless you need it (be careful with the IdentityMapper and IdentityReducer).

Line Indexer Reduce Code --
The line indexer Reducer takes in all the <word, offset> key/value pairs output by the Mapper for a single word. For example, for the word "cipher", the pairs look like: <cipher, 38624:To cipher what is writ in learned books>, <cipher, 12046:To cipher me how fondly I did dote;>, <cipher, ... >. Given all those <key, value> pairs, the reduce outputs a single value string. For the line indexer problem, the strategy is simply to concat all the values together to make a single large string, using "^" to separate the values. The choice of "^" is arbitrary -- later code can split on the "^" to recover the separate values. So for the key "cipher" the output value string will look like "38624:To cipher what is writ in learned book^12046:To cipher me how fondly I did dote;^34739:Mine were the very cipher of a function,^ ...". To do this, the Reducer code simply iterates over values to get all the value strings, and concats them together into our output String.

public static class LineIndexerReducer extends MapReduceBase implements Reducer {
    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
        boolean first = true;
        StringBuilder toReturn = new StringBuilder();
        while(values.hasNext()){
            if(!first)
                toReturn.append('^');
            first=false;
            toReturn.append(values.next().toString());
        }
        output.collect(key, new Text(toReturn.toString()));
    }
}

Line Indexer Main Program --
Given the Mapper and Reducer code, the short main() below starts the MapReduce running. The Hadoop system picks up a bunch of values from the command line on its own, and then the main() also specifies a few key parameters of the problem in the JobConf object, such as what Map and Reduce classes to use and the format of the input and output files. Other parameters, such as the number of machines to use, are optional and the system will determine good values for them if not specified.

public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(LineIndexer.class);
    conf.setJobName("LineIndexer");
    // The keys are words (strings):
    conf.setOutputKeyClass(Text.class);
    // The values are offsets+line (strings):
    conf.setOutputValueClass(Text.class);
    conf.setMapperClass(LineIndexer.LineIndexerMapper.class);
    conf.setReducerClass(LineIndexer.LineIndexerReducer.class);
    if (args.length < 2) {
        System.out.println("Usage: LineIndexer <input path> <output path>");
        System.exit(0);
    }
    conf.setInputPath(new Path(args[0]));
    conf.setOutputPath(new Path(args[1]));
    JobClient.runJob(conf);
}

Tasks:

  1. (10%) Running the Line Indexer MapReduce Job -- Each project team member should  run this MapReduce job, by first loading the line indexer files into Eclipse and compiling them into a jar file. Next, each person should start up the Hadoop VMware image and install the jar file in your virtual machine. Download the single file Shakespeare corpus (Shakespeare's First Folio of 35 Plays in a single file), copy it into your virtual machine, and unzip it. Then, follow the instructions above to run your line indexer on the corpus. Once you have the MapReduce job running correctly in the VMware image, repeat the exercise on the Google/IBM cluster. Save the output file for submission with your code.

  2. (10%) Building the Inverted Book Index (Step 1: Cleaning the input) -- A naive parser will group words by attributes which are not relevant to their meaning. Modify your parser  to "scrub" words. You can define "scrub" however you wish; some suggestions include case-insensitivity, punctuation-insensitivity, etc.

  3. (25%) Building the Inverted Book Index (Step 2: Eliminating Stop Words) -- Some words are so common that their presence in an inverted index is "noise," that is they can obfuscate the more interesting properties of a document. Such words are called “stop words.” For this part of the assignment, write a word count MapReduce function to perform a word count over a corpus of text files and to identify stop words. It is up to you to choose a reasonable threshold (word count frequency) for stop words, but make sure you provide adequate justification and explanation of your choice. A naïve parser will group words by attributes which are not relevant to their meaning (e.g., "buffalo", "Buffalo", and "buffalo." are all the same word), however, your parser should "scrub" words. It is up to you to define "scrub" however you wish; some suggestions include case-insensitivity, punctuation-insensitivity, etc.  Once you have written your code, install it in your virtual machine. Then, download the multiple file Shakespeare corpus (Shakespeare's First Folio of 35 Plays in multiple files), copy it in your virtual machine, unzip and extract the files from the tar file. Run your code and collect the word counts for submission with all your Mapper and Reducer Java files. As a starting point, you can find the source code for a simple word count map-reduce function in:
    hadoop-0.14.2\src\examples\org\apache\hadoop\examples\WordCount.java

  4. (30%) Building the Inverted Book Index (Step 3: Creating a Full Inverted Index) -- For this portion of the assignment, you will design a MapReduce-based algorithm to calculate the inverted index over the Project Guttenburg corpus. Instead of creating a partial inverted file index (which maps words to their document ID as described above), you are to create a full inverted index, which maps words to their document ID + position in the document. It is up to you to decide how you will specify a word's position in the document. Note that your final inverted index should not contain the words identified in Step 1. How you choose to remove those words is up to you; one possibility is to create multiple MapReduce passes, but there are many possible ways to do this.  The format of your MapReduce output (i.e., the inverted index) must be simple enough to be machine-parseable; it is not impossible to imagine your index being one of many data structures used in a search engine's indexing pipeline. Your submitted indexer should be able to run successfully on the Project Guttenburg corpus, where "successfully" means it should run to completion without errors or exceptions, and generate the correct word->DocID mapping. You are required to submit all relevant Mapper and Reducer Java files, in addition to any supporting code or utilities.

  5. (25%) Building the Inverted Book Index (Step 4: Querying the Full Inverted Index) -- Write a query program on top of your full inverted file index that accepts a user-specified query (one or more words) and  returns not only the document IDs but also a text "snippet" from each document showing where the query term appears in the document. Your query format should support "and," "or," and "not" operations. Since a real full inverted file index could be enormous, your query program should use a MapReduce-based algorithm to perform the query. You are required to submit all relevant Mapper and Reducer Java files.

 

Design Questions:

Please answer the following questions in your design document (due one day after the code for this phase is due).

  1. How long did you originally think this project would take you to finish?
  2. How much time did you actually spend on this project?
  3. What, if any, "stop words" did you find as a result of your WordCount? By what criteria did you determine that they were "noisy?" Were you surprised by any of these words?