This work is Copyright © 2007 University of California, Berkeley, and is based on work Copyright © 2007 University of Washington and licensed under the Creative Commons Attribution 3.0 License -- http://creativecommons.org/licenses/by/3.0/
This
assignment involves creating an full inverted index of the
contents of all
the books. A full inverted
index is a mapping of words to their location (document file and
offset) in a set of documents. Most
modern search engines utilize some form of a full inverted index to
process
user-submitted queries. In its most basic form, an inverted index is a
simple
hash table which maps words in the documents to some sort of document
identifier, while a full inverted index maps words in the documents to
some sort of document indentifier and offset within the document. For
example, if given the following two documents:
Doc1 (if you don’t know the story, search for this on
Wikipedia):
Buffalo buffalo Buffalo buffalo
buffalo buffalo Buffalo buffalo.
Doc2:
Buffalo are
mammals.
You could construct the following inverted file index:
Buffalo -> (Doc1,1), (Doc1, 2), (Doc2, 1)
buffalo -> (Doc1, 1), (Doc1, 2)
buffalo. -> (Doc1, 1)
are -> (Doc2, 1)
mammals. -> (Doc2, 2)
To create the full inverted index in a scalable manner, you
will use Hadoop (http://lucene.apache.org/hadoop/), an open-source implementation of Google's MapReduce (http://labs.google.com/papers/mapreduce.html) programming paradigm to manage the distributed processing over multiple computers (nodes).
The following VMware image contains a preconfigured single node
instance of Hadoop that provides the same interface as a full cluster
without any of the overhead. To use the pre-built Hadoop image,
you first need to set up the free VMware Player (or alternatively,
download either VMware Workstation or Fusion and request a license key
from cs162@cory). You can download VMware
Player for Linux or Windows from here (http://www.vmware.com/download/player/download.html).
Once you have VMware player installed, download the local VMware image (110.1 MB) from here (original: http://dl.google.com/edutools/hadoop-vmware.zip). Or download an updated VM image with Hadoop 0.16.3 here. The image is packaged as a directory archive. To begin set up unzip the image in the directory of your choice (you need at least 10GB, and the disk image can grow to 20GB). The VMware image package contains:
image.vmx
-- The VMware guest OS profile, a configuration file
that describes the virtual machine characteristics (virtual CPU(s), amount of
memory, etc.).
20GB.vmdk
-- A VMware virtual disk used to store the content of
the virtual machine hard disk; this file grows as you store data on the virtual
image. It is configured to store up to 20GB.
The system image is based on Ubuntu (version 7.04) and contains a Java machine (Sun JRE 6 - DLJ License v1.1) and the Hadoop 0.13.0 distribution.
To start the VMware Virtual Machine, go to the directory where the packaged files are unzipped, and run:
vmplayer image.vmx
A new window will appear which will print a message indicating the IP address allocated to the guest OS. This is the IP address you will use to submit jobs from the command line or the Eclipse environment. The guest OS contains a running Hadoop infrastructure which is configured with:
The guest OS can be reached from the provided console or via SSH using the IP address indicated above. Log into the guest OS with:
guest
, guest password: guest
root
, administrator password:
root
Once the image is loaded, you can log in with the guest account. Hadoop will
be installed in the guest home directory(/home/guest/hadoop
). Three
scripts are provided for Hadoop maintenance purposes:
start-hadoop
-- Starts file-system and MapReduce daemons.
stop-hadoop
-- Stops all Hadoop daemons.
reset-hadoop
-- Restarts new Hadoop environment with entirely
empty file system. Note: You must stop all daemons before you reset.
The Hadoop configuration can be edited by modifying the files in
/home/guest/hadoop-conf/
For further information on this go to Hadoop Wiki .
To stop the Virtual Machine, log in as administrator and issue:
poweroff
To run MapReduce programs from the command line, log into the guest OS, and
use the Hadoop Command line tool to manipulate HDFS files and MapReduce jobs. A
set of simple MapReduce programs are included with the Hadoop Distribution in
hadoop-examples.jar
. For example, to run the MapReduce
approximation of pi included with the example files with four map tasks, each
computing ten thousand samples, issue:
hadoop jar hadoop-examples.jar pi 4 10000
For more information see the Examples section on the Hadoop Wiki (http://wiki.apache.org/lucene-hadoop/). Alternatively Hadoop jobs can be run directly from Eclipse (see below).
To run a Hadoop job, simply ssh into cluster's gateway node by following the instructions here: http://inst.eecs.berkeley.edu/cgi-bin/pub.cgi?file=mapreduce.help#5.
Using the Eclipse Plug-in with the VM --
The IBM MapReduce Tools for Eclipse Plug-in is a robust plug-in that brings
Hadoop support to the Eclipse platform, including support for server configuration, launching MapReduce
jobs and browsing the distributed file system. This setup assumes that you are running Eclipse (version 3.3 or above) on your
computer (you can find instructions for setting up Eclipse here). Instructions for using Subversion instead of CVS can be found here.
Window
-> Open
Perspective
-> Other...
-> Map/Reduce
.
Window
->
Show View
-> Other...
-> Map Reduce
Tools
-> Map Reduce Servers
. Once you have the plug-in working with Eclipse, you can add a new Hadoop server by:
provided ip address
(see above)
/home/guest/hadoop
guest
guest
(when prompted)Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. The run-time system takes care of the details of partitioning the input data, scheduling the program's execution across a set of machines, handling machine failures, and managing the required inter-machine communication. This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system. Google's implementation of MapReduce runs on a large cluster of commodity machines and is highly scalable: a typical MapReduce computation processes many terabytes of data on thousands of machines. Google programmers find the system easy to use: hundreds of MapReduce programs have been implemented and upwards of one thousand MapReduce jobs are executed on Google's clusters every day.
Hadoop is a Free Java software framework that supports distributed applications running on large clusters of commodity computers that process huge amounts of data. It is an Apache Lucene sub-project and was originally developed to support distribution for the Nutch web crawler. Hadoop consists of a distributed filesystem reminiscent of GoogleFS named the "Hadoop Distributed File System" (HDFS) and a MapReduce implementation. Hadoop was named after its creator, Doug Cutting's child's stuffed elephant.
We are not what
we want to be,
but at least
we are not what
we used to be.
Running the Map code that for each word, outputs a pair <word,
line-number>, yielding the set of pairs...
<we, 1>
<are, 1>
<not, 1>
<what, 1>
<we, 2>
<want, 2>
<to, 2>
<be, 2>
<but, 3>
etc...
For now we can think of the <key, value> pairs as a
nice linear list, but in reality, the Hadoop process runs in parallel on many
machines. Each process has a little part of the overall Map input (called a map
shard), and maintains its own local cache of the Map output. For a description
of how it really works, you should read the Hadoop Wiki (http://wiki.apache.org/lucene-hadoop/) and the
Google paper: "MapReduce: Simplified Data Processing on Large Clusters,"
by Jeffrey Dean and Sanjay Ghemawat, OSDI'04: Sixth Symposium on
Operating System Design and Implementation, San Francisco, CA,
December, 2004 (Their slide presentation can be found here: http://labs.google.com/papers/mapreduce.html). After the Map phase
produces the intermediate <key, value> pairs they are efficiently and
automatically grouped by key by the Hadoop system in preparation for the Reduce
phase (this grouping is known as the Shuffle phase of a map-reduce). For the
above example, that means all the "we" pairs are grouped together,
all the "are" pairs are grouped together like this, showing each
group as a line...
<we, 1> <we, 2> <we, 4> <we, 5>
<are, 1> <are, 4>
<not, 1> <not, 4>
<what, 1> <what, 4>
<want, 2>
<to, 2> <to, 5>
<be, 2> <be 5>
<but, 3>
<at, 3>
<least, 3>
<used, 5>
The Reducer class contains a reduce function, which is then
called once for each key -- one reduce call for "we", one for
"are", and so on. Each reduce looks at all the values for that key
and outputs a "summary" value for that key in the final output. So in
the above example, the reduce is called once for the
"we" key, and passed the values the mapper
output, 1, 4, 2, and 5 (the values going into reduce are not in any particular
order). Suppose reduce computes a summary value string made of the line numbers
sorted into increasing order, then the output of the Reduce phase on the above
pairs will produce the pairs shown below. The Reduce phase also sorts the
output <key,value> pairs into increasing order
by key:
<are, 1 4>
<at, 3>
<be, 2 5>
<but, 3>
<least, 3>
<not, 1 4>
<to, 2 5>
<we, 1 2 4 5>
<what, 1 4>
<want, 2>
<used, 5>
Like Map, Reduce is also run in parallel on a group of
machines. Each machine is assigned a subset of the keys to work on (known as a
reduce shard), and outputs its results into a separate file.
Line Indexer Example --
Consider a simple "line indexer" (all of the
example code here is available to be checked out, built, and run). Given an
input text, offset indexer uses Hadoop to produce an index of all the words in
the text. For each word, the index has a list of all the locations where the
word appears and a text excerpt of each line where the word appears. Running
the line indexer on the complete works of Shakespeare yields the following
entry for the word "cipher".
38624 To cipher what is writ in learned books,
12046 To cipher me how fondly I did dote;
34739 Mine were the very cipher of a function,
16844 MOTH To prove you a cipher.
66001 ORLANDO Which I take to be either a fool or a cipher.
The Hadoop code below for the line indexer is actually
pretty short. The Map code extracts one word at a time from the input, and the
Reduce code combines all the data for one word.
Line Indexer Map --
A Java Mapper class is defined in
terms of its input and intermediate <key, value> pairs. To declare one,
simply subclass from MapReduceBase and implement the Mapper interface. The Mapper
interface provides a single method: public void map(WriteableComparable key, Writeable value, OutputCollector output, Reporter reporter).
Note: these inner classes probably need to be declared "static". If
you get an error saying ClassName.<init>()
is not defined, try declaring your class static. The map function takes four
parameters which in this example correspond to:
The Hadoop system divides the (large) input data set into
logical "records" and then calls map() once
for each record. How much data constitutes a record depends on the input data
type; For text files, a record is a single line of
text. The main method is responsible for setting output key and value types.
Since in this example we want to output <word, offset>
pairs, the types will both be Text (a basic string wrapper, with UTF8 support).
It is necessary to wrap the more basic types because all input and output types
for Hadoop must implement WritableComparable, which
handles the writing and reading from disk.
Line Indexer Map --
For the line indexer problem, the map code takes in a line of text and for each
word in the line outputs a string key/value pair <word, offset:line>. The Map code below accomplishes that by parsing each word out of value. For the
parsing, the code delegates to a utility StringTokenizer
object that implements hasMoreTokens()
and nextToken() to iterate through the tokens.
public static class LineIndexerMapper extends MapReduceBase implements Mapper
{
private final static Text word = new Text();
private final static Text summary = new Text();
public void map(WritableComparable key,
Writable val, OutputCollector output,
Reporter reporter)
throws IOException {
String line = val.toString();
summary.set(key.toString() + ":" + line);
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while(itr.hasMoreTokens())
{
word.set(itr.nextToken());
output.collect(word, summary);
}
}
}
When run on many machines, each mapper
gets part of the input -- so for example with 100 Gigabytes of data on 200 mappers, each mapper would get
roughly its own 500 Megabytes of data to go through. On a single mapper, map() is called going
through the data in its natural order, from start to finish. The Map phase
outputs <key, value> pairs, but what data makes up the key and value is
totally up to the Mapper code. In this case, the Mapper uses each word as a key, so the reduction below ends
up with pairs grouped by word. We could instead have chosen to use the
line-length as the key, in which case the data in the reduce phase would have
been grouped by line length. In fact, the map() code
is not required to call output.collect() at all. It
may have its own logic to prune out data simply by omitting collect. Pruning
things in the Mapper is efficient, since it is highly
parallel, and already has the data in memory. By shrinking its output, we
shrink the expense of organizing and moving the data in preparation for the
Reduce phase.
Line Indexer Reduce --
Defining a Reducer is just as easy. Simply subclass MapReduceBase and implement the Reducer interface: public
void reduce(WriteableComparable
key, Iterator values, OutputCollector
output, Reporter reporter). The reduce()
method is called once for each key; the values parameter contains all of the
values for that key. The Reduce code looks at all the values and then outputs a
single "summary" value. Given all the values for the key, the Reduce
code typically iterates over all the values and either concats
the values together in some way to make a large summary object, or combines and
reduces the values in some way to yield a short summary value.
The reduce() method produces its final value in the
same manner as map() did, by calling output.collect(key,
summary). In this way, the Reduce specifies the final output value for the
(possibly new) key. It is important to note that when running over text files,
the input key is the byte-offset within the file. If the key is propagated to
the output, even for an identity map/reduce, the file will be filed with the
offset values. Not only does this use up a lot of space, but successive
operations on this file will have to eliminate them. For text files, make sure
you don't output the key unless you need it (be careful with the IdentityMapper and IdentityReducer).
Line Indexer Reduce Code --
The line indexer Reducer takes in all the <word, offset> key/value pairs
output by the Mapper for a single word. For example,
for the word "cipher", the pairs look like: <cipher, 38624:To cipher what is writ in learned books>, <cipher,
12046:To cipher me how fondly I did dote;>, <cipher, ... >. Given all
those <key, value> pairs, the reduce outputs a single value string. For
the line indexer problem, the strategy is simply to concat
all the values together to make a single large string, using "^" to
separate the values. The choice of "^" is arbitrary -- later code can
split on the "^" to recover the separate values. So for the key
"cipher" the output value string will look like "38624:To cipher what is writ in learned book^12046:To cipher me
how fondly I did dote;^34739:Mine were the very cipher of a function,^
...". To do this, the Reducer code simply iterates over values to get all
the value strings, and concats them together into our
output String.
public
static class LineIndexerReducer extends MapReduceBase implements Reducer {
public void reduce(WritableComparable key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException
{
boolean first = true;
StringBuilder toReturn = new StringBuilder();
while(values.hasNext()){
if(!first)
toReturn.append('^');
first=false;
toReturn.append(values.next().toString());
}
output.collect(key, new Text(toReturn.toString()));
}
}
Line Indexer Main Program --
Given the Mapper and Reducer code, the short main()
below starts the MapReduce running. The Hadoop system picks up a bunch of
values from the command line on its own, and then the main() also specifies a
few key parameters of the problem in the JobConf
object, such as what Map and Reduce classes to use and the format of the input
and output files. Other parameters, such as the number of machines to use, are
optional and the system will determine good values for them if not specified.
public
static void main(String[] args) throws IOException {
JobConf conf = new JobConf(LineIndexer.class);
conf.setJobName("LineIndexer");
// The keys are words
(strings):
conf.setOutputKeyClass(Text.class);
// The values are offsets+line
(strings):
conf.setOutputValueClass(Text.class);
conf.setMapperClass(LineIndexer.LineIndexerMapper.class);
conf.setReducerClass(LineIndexer.LineIndexerReducer.class);
if (args.length < 2) {
System.out.println("Usage: LineIndexer <input path> <output path>");
System.exit(0);
}
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
}
Please answer the following questions in your design document (due one day after the code for this phase is due).