Introduction to Hadoop (Revised 2)
CS 300, Parallel and Distributed Computing (PDC)
Due Monday, December 3, 2018
NEW:
explain about multiple output files
______
______
______
______
______
______
______
______
______
____________
Introduction to Hadoop
C++, Java versions
Word count
index
Preliminary material
Map-reduce model
Hadoop implementation.
HDFS, distributed file system
______
Mappers, Reducers, and types.
Input formats; KeyValueTextInputFormat
Job configuration with JobConf; with command-line options
Identity mappers and identity reducers
Laboratory exercises
Note: Local reference for running Hadoop jobs is the
Hadoop Reference, January 2011 page on the Beowiki. (It needs updating as of 2015.)
Create directories
Create shell windows on a link machine and on one of the cluster head nodes, such as
cumulus.Note: For convenient editing, you can enable X forwarding for these SSH sessions as follows:
link% ssh -X cumulus.cs.stolaf.edu
(From a Mac, replace-Xby-Y.)
With X forwarding, a new X11 window will pop up on your local computer when you enter an X11 command such asemacs.On a head node, create
~/PDC/lab11. Change directory tolab11.
Example of Hadoop code,
WordCount.java.We will start with a discussion of the source file,
WordCount.javaJava's basic syntax is similar to C or C++. For example, the characters
//in the first lines start an in-line comment, and/* ... */can also be used for comments; also, the syntax rules for Javafor,while,if, etc., are similar to those for C or C++.The
packagecommand in Java provides a naming context for the classes defined in the fileWordCount.java. Next, severalimportcommands provide names of classes used later in the file. Some of theseimportcommands draw from standard Java code libraries, and some are specific to Hadoop.The
importcommands seem comparable to#includein C or C++, but they're actually more likeusing namespace. For example,import org.apache.hadoop.conf.*;
enables us to writeJobConf conf = new JobConf(WordCount.class);
in the first line ofmain(), instead oforg.apache.hadoop.conf.JobConf conf = new org.apache.hadoop.conf.JobConf(WordCount.class);
The remainder of the file defines a single class
WordCountwith one class methodmain(), and two nested classesMapandReduce. As with C++, one can define one class inside of another class; a nested class is only visibile within the scope of its containing class.The next items delve into
main()and these two inner classes.
main() method
public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("WordCount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); //conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(KeyValueTextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }-
The keyword
staticindicates thatmain()and the two classes have class scope, so Hadoop can use them without constructing aWordCountobject. The method
main()corresponds to themain()function of a C++ program. (Incidentally, in Java, there are no standalone functions, but only methods of classes.) TheWordCountclass'smain()sets up configuration parameters for a Hadoop job, then callsJobClient.runJob()to launch that Hadoop job.The
Thethrowsqualifier formain()is about error handling. If a call in the body ofmain()encounters an error (e.g., a non-existant class that it is expecting), that call will generate an object of classExceptionfor that error, then will forward that object to the Java runtime environment using a mechanism called throwing thatExceptionobject.Exceptionobject can contain error types and messages, relevant values, and other context information.The configuration information is packed into a
JobConfobject calledconf. Observe that Java usesnewfor dynamic allocation, like C++.In Java, classes cannot contain objects per se, but only references to objects (which are like pointers). Thus, unlike C++, if we omit the initialization
= new JobConf(...), no default constructor would be called. Instead, only a single memory location for holding the reference would be allocated, awaiting a value from a call tonewto allocate that object.Arguments that end with
.classrefer to compiled class definitions in Java. For example,Text.classrefers to a predefined class namedTextthat is part of the Hadoop distribution (roughly representing strings of characters). The argumentWordCount.classfor constructor call forJobConfrefers to the result of compiling theWordCountclass we are examining.As an example, the line
conf.setInputFormat(KeyValueTextInputFormat.class);
sets theInputFormatconfiguration parameter. NumerousInputFormatclasses are predefined and available in Hadoop, and it's possible to create your own. We choseKeyValueTextInputFormat.classbecause WMR uses that input format.The expressions
arg[0]andarg[1]refer to command-line arguments formain(). These two command-line arguments are the names of the input and output directories or files for this Hadoop word-count computation.
Map class
public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Text key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = key.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } }The mapper in this Hadoop map-reduce computation will be the method
map()of the class namedMapthat is defined within theWordCountclass.When defining the
Mapclass, the codeextends MapReduceBase
indicates thatMapis a subclass of another classMapReduceBase. Also, the codeimplements Mapper<Text, Text, Text, IntWritable>
means thatMapincludes the methods specified in the Java Interface namedMapper. The angle brackets<...>provide type names, just like a template in C++, but it is called a generic in Java.The four type values for a
Mapper<>indicate the key and value types for the mapper input, then the output key and value types for the mapper.As we know from WebMapReduce, Hadoop uses strings to transmit data values in mappers and reducers. This means, for example, that an integer data value such as 1 must be turned into a string
"1"in order to be emitted (i.e., collected by the output, in Hadoop terminology). Hadoop'sWritableclasses perform the conversions between data values of various types and strings. For example, theIntWritableobjectonedefined as the first state variable in theMapclass is essentially a string representation"1"for the integer value 1. In the word count algorithm, we will emitoneas the value.The
map()function has four arguments:the incoming
keyfor a mapper call (a string of Hadoop typeText) holds a line of the source text in the case of our data filemice.txt;the incoming
valuefor a mapper call (also Hadoop typeText) will be empty for our example file;an
OutputCollector<>objectoutputfor receiving emits from that call to the mapper; anda
Reporterobject for reporting on the status and any errors during that call to the mapper.
map()method proceeds toconvert the
keyfrom a HadoopTextobject to a JavaStringobject,split that
Stringinto words stored in a JavaStringTokenizerobject (words separated by whitespace by default), thenenter a loop on those words that converts the next word
tokenizer.nextToken()from JavaStringto a HadoopTextobject, then emits thatTextobject as key andoneas value. Note: Emitting in Hadoop is accomplished in a call ofoutput.collect().
Reduce class
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }The reducer specified by class
Reducehas input keys of typeText, input values of typeIntWritable, output keys of typeText, and output values of typeIntWritable. For the word count algorithm, the input key represents a word, and each input value always represents the value 1; the output key is that word, and the output value will be the sum of the 1's for that word.The
reduce()method itself has four arguments, very simlar to theMap.map()method, except that the second argument ofreduce()is a Iterator ofIntWritablevalues, instead of a singleTextvalue. As with WMR, an iterator is used because there may be too many (or too large) values to hold them all at once in main memory.
Compiling and running a Hadoop job
First, upload your data to a directory in the HDFS. For this example, you can create an HDFS directory
/user/username/miceon the cluster you are using, and upload a test filemice.txtto that HDFS directory containing the following lines.Three blind mice, three blind mice! See how they run, see how they run! They all ran after the farmer's wife, She cut off their tails with a carving knife. Did you ever see such a thing in your life As three blind mice?
Note: Hadoop uses your HDFS home directory as a default directory. Thus, you can accomplish the HDFS operations above withcumulus$ hadoop fs -mkdir mice cumulus$ hadoop fs -put mice.txt mice
omitting the absolute path prefixes/user/username/Compile and run the Java code by hand.
Copy
WordCount.javato yourlab11directory on your cluster's head node (e.g.,cumulus), perhaps cutting and pasting into anemacsrunning oncumulus. Then, issue the following commands to compile that file.cumulus$ cd ~/PDC/
Notes:lab11cumulus$ mkdir classes cumulus$ javac -classpath /usr/lib/hadoop/hadoop-0.20.1-core.jar -d classes WordCount.java cumulus$ jar cvf WordCount.jar -C classes edu cumulus$ hadoop jar WordCount.jar edu.stolaf.cs.WordCount mice wc-out1The subdirectory
classesis a destination for the.classfiles created during compilation. After compilation, the directory~/PDC/lab11/classes/edu/stolaf/cs/will hold the following files:WordCount$Map.class WordCount$Reduce.class WordCount.class
The subdirectory structure is determined by thepackageinstruction in the Java source fileWordCount.java.The
javaccommand performs the actual compilation. The flag-classpath /usr/lib/hadoop/hadoop-0.20.1-core.jarinforms the Java compiler where to find the precompiled Hadoop libraries.Notes:
On
mist, use-classpath /usr/lib/hadoop/hadoop-core-1.1.2.jar.On
cumulus, thejavaccompilation will produce deprecation warnings, which can be ignored for this course.
The
jarcommand packages up the contents of the subdirectoryclassesinto a single file, called a "jar" (Java archive).Finally, we launch the Hadoop job with the
hadoop jarcommand. The command-line arguments arethe
jarfile that contains the compiled code,the class in that
jarthat containsmain(),the HDFS path to the input, and
an HDFS path for a new directory to receive the output from this Hadoop run.
Note: Hadoop creates this directory, which must not already exist.
/user/username/, we can omit that prefix and specify the input and output directories simply asmiceandwc-out1.Hadoop produces a considerable amount of output on the command line about the progress and completion of the job.
After the job has finished running successfully, the output directory
wc-out1(i.e.,/user/username/wc-out1, for yourusername) on HDFS should contain the results in a filepart-00000(assuming there is a single reduce process).
To examine that output, first list the files in the HDFS directory.
cumulus$ hadoop fs -ls wc-out1 cumulus$ hadoop fs -cat wc-out1/part-00000 | more
The first of these commands should display two HDFS entries, a subdirectory_logs(which records information about the job) and the output file/user/username/wc-out1/part-00000. The second command prints the output file on sthadard output, one page at a time (enter a space to see the next page).ALSO DO THIS: Record the commands you used for this step in a file
lab11/README. Label this documentation as performing a by-hand submission of a Hadoop job.Save your work in a
commit.cumulus$ git add README WordCount.java mice.txt cumulus$ git commit -m "lab11: First Hadoop job, submitted at the command line. WordCount.java"
Using the St. Olaf "Hadoop
Makefile" to compile and run a Hadoop job.The commands for compiling and running a Hadoop job are somewhat tedious and error prone. Not only must you remember to enter all the commands in the correct order, but you need to create a new output directory on HDFS with each run.
Therefore, we have developed a local Hadoop
Makefileto compile and run Hadoop jobs, then view the results.First, copy the Hadoop
Makefileto yourlab11directory oncumulus.Note: If you are on
mist, delete the#at the beginning of this line in theMakefile:# HADOOP_JAR = /user/lib/hadoop/hadoop-core-1.1.2.jar # mist 1/2015
Now perform the command
cumulus$ touch WordCount.java
to change the modification date for that file (without changing the content of the file), and entercumulus$ make WordCount.hadoop DATA=mice
The.hadoopat the end of this make target requestsmaketo compile the job withjavac, createWordCount.jar, then launch a Hadoop job using that.jarfile and a fresh output directory name on the HDFS. Note: The command-line argumentDATA=micespecifies the input directory/user/username/miceon HDFS, and theMakefilechooses a fresh output directory within an HDFS directory/user/username/out/.To examine the output from a successful run, enter
cumulus$ make print | more
The HadoopMakefilekeeps track of the most recent output directory name and useshadoop fs -catto display the results on standard output.
Add instructions for using the Hadoop Makefile to your
READMEfile, then save your work in acommit.cumulus$ git add README Makefile cumulus$ git commit -m "lab11: Running Hadoop job with the St. Olaf Hadoop Makefile"
Features of the Hadoop
MakefileThe Hadoop
Makefilerecords information about the most recent job. Themake printexample above indicates one aspect of this. These are not general features ofmake, but instead are extra capabilities implemented in this particularMakefile.The Hadoop
Makefileuses that recorded information to launch the next job more conveniently, too.To see this, carry out these steps:
Make a small but noticeable change in
WordCount.java, namely, initializing the value of theIntWritablestate variableoneto 2 instead of 1.private final static IntWritable one = new IntWritable(2);
This should double all the word frequencies produced by the program. Check this by enteringcumulus$ make hadoop cumulus$ make print
The invocationmake hadoopshould recompile and rerun yourWordCountjob, the last.hadooptarget entered, starting with the edited.javacode. Themake printcommand should display the output, showing that all word counts are doubledBe sure to change the definition of
oneback to 1 inWordCount.java, so that code will be correct again.
Here is a list of features of the Hadoop
MakefileOption to specify data set for a Hadoop job using command-line argument
DATA=dir. Here,diris automatically relative to/user/usernameunlessdiris an absolute path (starting with/).Generates a new HDFS output directory name for each Hadoop run, based on the name of the class (e.g.,
WordCount. Output directories from Hadoop runs are stored by default in an HDFS subdirectory/user/username/out.Option to specify a desired output directory on the HDFS with command-line argument
OUTDIR=dir, wherediris interpreted relative to/user/usernameif it is not absolute (as withDATA). Note:diris not created in theoutsubdirectory, unless you includeout/at the beginning ofdir.The output subdirectory name
outcan be changed by a command-line argumentOUTPARENT=dir.Saves the class name for the most recent
ClassName.hadooptarget. The next targethadoopbecomes equivalent toClassName.hadoop.Saves
DATA, job ID number (used in output directory name), and output directory name for the most recent run of each programClassName.java. This is retrieved byhadoopandprinttargets.Saves the
OUTPARENTvalue from the most recent Hadoop run (not perClassName).
Examples:
______
Note on using Eclipse with Hadoop (optional)
Note: Please do not run Eclipse on head nodes, since it takes so many resources. If you are familiar with Eclipse and want to edit Java with it, be sure to enter theeclipsecommand on a Link machine, and usescpto transfer the resultingjavafiles to a cluster head node.
Some first Hadoop programming exercises
Create a copy
WordCountValue.javaofWordCount.java, and edit that copy to count word frequencies when the text is in thevalueinstead of thekey.Create a file
mice2.txtas test input for this Hadoop program, containing the following:mice:1 Three blind mice, three blind mice! mice:2 See how they run, see how they run! mice:3 They all ran after the farmer's wife, mice:4 She cut off their tails with a carving knife. mice:5 Did you ever see such a thing in your life mice:6 As three blind mice?
Create a new HDFS directorymice2(a subdirectory of/user/usernameand upload your filemice2.txtto it.The file
WordCountValue.javaneeds these changes.In Java, the name of the file indicates the name of a class defined in that file. So, change the name of the (outer) class to
WordCountValueinstead ofWordCount.Also change the constructor argument for
JobConf()object fromWordCount.classtoWordCountValue.class, and modify the job name configuration setting.Modify the mapper to split the
valueinstead of thekey. (See the discussion ofWordCount.javaabove if you're having trouble figuring out what to change.)
To compile and run your new Hadoop job, enter
cumulus$ make WordCountValue.hadoop DATA=mice2
If you need to make corrections, you can try again usingcumulus$ make hadoop
since the nameWordCountValuewill now be the HadoopMakefile's remembered name. Finally, entercumulus$ make print
to see the output, which should be identical to the output from the original (key-splitting)WordCount.javawithDATA=mice.Carry out these steps to be certain that the two programs are producing identical data.
Save the HDFS output from the
WordCountValuejob to a file in yourcumuluslab11directory.cumulus$ make print > wcv.out
Now, re-run the
WordCountjobcumulus$ make WordCount.hadoop
Now, you can use the Linux utility
diffto compare the output from each job character by character:cumulus$ make print | diff - wcv.out 1c1 < hadoop fs -cat out/WordCount.29348/part-00000 --- > hadoop fs -cat out/WordCountValue.26066/part-00000
This output fromdiffindicates that all the key-value pairs were identical, and only thehadoop fscommands reported bymakediffered between the two runs. (The numbers23948and26066are job numbers assigned by the HadoopMakefile, and they would differ even if we compared two runs of the same.javaprogram.)
Save your work in a
commit.cumulus$ git add WordCountValue.java cumulus$ git commit -m "lab11: Word count with text in the value field of input"
Hadoop performance and scalability.
Try timing some runs of an already-compiled Hadoop program.
Since
WordCountValue.jaralready exists, the HadoopMakefilewon't rebuild it if we launch moreWordCountValuejobs without modifying the source fileWordCountValue.javaSo, it's reasonable to compare approximate performance for Hadoop runs of that
.jarusingtime. Try the following, and record your timing results.cumulus$ time make WordCountValue.hadoop DATA=mice2 cumulus$ time make hadoop DATA=lab10
The HadoopMakefilewill reuseWordCountValue.jarin both cases. Record timing results and make observations in a file~/PDC/.lab11/READMENotes:
In one sample run on
cumulusin January 2015, the timing forDATA=mice2wasreal 0m22.900s user 0m0.921s sys 0m0.078s
which indicates that local processing to launch the Hadoop job was about 1 second, even though the elapsed time was about 23 seconds. Thus, the bulk of the time was taken up blocking for Hadoop services on the cluster to do their work. There are only 220 bytes inmice2.txt, but Hadoop incurs lots of overhead preparing for potentially terascale or larger processing.In another run with about 20,000 bytes of data, the timings were
real 0m24.899s user 0m0.949s sys 0m0.078s
Thus, the localuser+systime was only about 0.028 sec longer and the total elapsedrealtime was about 2 sec longer, which further indicates that most of the time is overhead, and little is needed to actually process such small data.In a third run with
DATA=/shared/gutenberg/all_n/group11(294 Project Gutenberg books with line numbers in the key field, totalling about 125MB), the timings werereal 1m2.239s user 0m1.275s sys 0m0.104s
It took about 2.5 times as many seconds to process over 6000 times as much data. Hadoop is certainly designed for scaling well as the data size increases!Since
cumulushas only 13 older quad-core nodes running on gigabit ethernet at the time of these tests, this scalability probably won't continue for long as the data size increases (e.g., it would probably take more than 3 minutes to process a terabyte of data, which is about 6000 times the size ofgroup11). But given thousands of nodes for a cluster with better hardware and networking, it's quite believable that Hadoop can compute at petascale with acceptable performance.
ALSO DO THIS: Record your observations in a
commit.cumulus$ git add README cumulus$ git commit -m "lab11: Timing results for WordCountValue"
Lowering case and removing punctuation
The Java
Stringclass has useful methods for operating on strings, includingtoLowerCase(),substring(), andcontains(), which can be used to test if a string of punctuation contains a particular character.
Here is a test program
Try.javathat demonstrates how these methods can be used to check whether the first characters of some words (entered as command-line arguments) are vowels:public class Try { public static void main(String args[]) { int i; for (i = 0; i < args.length; i++) if ("aeiou".contains(args[i].toLowerCase().substring(0,1))) System.out.println(args[i]); } }Notes:No
importstatements are needed to use the nameString.The command-line arguments for
main()are provided as an array ofString, which we are callingargs[](but which could be called something else).Unlike C++, Java does not accept declarations within the initialization of a
forloop, as infor (int i = ...).Here, we call the
contains()method of theString"aeiou".The
charAt()method ofStringis the usual way to examine a character, but as theStringdocumentation states,contains()expects twoStringarguments. It was simpler to usesubstringto retrieve the first character than to convert thecharvalue returned bycharAt()to aString.System.out.println(...)is the Java counterpart of C++'scout << ... << endl.To compile and run this test program, enter
cumulus$ javac Try.java cumulus$ java Try Apple banana Mango orange apple orange
The two lines of output for the four command-line arguments are shown
DO THIS: Apply these methods to create a Hadoop program
WordCountValue2.javathat counts frequencies of words after lowering the case and removing punctuation before and after the letters.Hint: You can use a
whileloop to find the index of the first non-punctuation character at the beginning of a string, and a secondwhileloop to find the index of the last non-punctuation character at the end of the string.Once you know those two indices, you can use
substring()to obtain a word with punctuation removed.
Test your program
WordCountValue2.javaonDATA=mice2, and determine whether it counted accurately by comparing againstwcv.out. For example, the frequency of the wordthreeforWordCountValue2.javashould equal the sum of frequencies of the wordsThreeandthreeinwcv.out.ALSO DO THIS: Record your work in a
commit.cumulus$ git add WordCountValue2.java cumulus$ git commit -m "lab11: WordCountValue ignoring case and punctuation"
OPTIONAL: Index computation
To compute an index with Hadoop directly, recall the IN/OUT spec for
index_in a prior WMR lab. ______OPTIONAL: Combiner. Observe that our code for
main()includes the instruction//conf.setCombinerClass(Reduce.class);
which suggests that these computations can make use of true Hadoop combiners. Thus, the key-value pairs sent to the reducer shouldn't all have value1in general. Formice2.txt, words such asblindandhowshould appear with value2emerging from the combiner. Verify this by removing//from the beginning of that line and showing the sequence of values received in each call of the reducer.Make a copy
CombinerTest.javaofMapReduceValue.java, and change the class name, job name, andJobConf()constructor parameter accordingly. (Consider a global substitution in your editor.) Also, remove//from the beginning of the line with aconf.setCombinerClass()call.Copy the inner class
Reduce, and change the copy's class name toCombine. (TheCombineclass should still implementReducer<>, and have a methodreduce(), etc. -- only one name needs to be changed). Also, set the classCombine.classas the combiner instead ofReduce.classconf.setCombinerClass(Combine.class);
Compile and runCombinerTest.javawithDATA=mice2. Compare towcv.outto verify that it produces the same output as aWordCountValuejob with that input data.Now, change just the
Reduceclass to accumulate the incoming values from the iterator, and append them to the frequency when emitting.The iterator delivers Hadoop
IntWritableobjects, not JavaStrings, so we must convert from String. ____________
______
______
______
______
______
______
______
____________
______
______
______
______
______
____________
Deliverables
All of your code for this assignment should already be
contained in commits. Modify the most recent
commit message to indicate that you are submitting
the completed assignment.
cumulus$ git commit --amendAdd the following to the latest
commit message.
lab11: complete
If your assignment is not complete, indicate your progress instead, e.g.,lab11: items 1-5 complete, item 6 partialYou can make later commits to submit updates.
Finally, pull/push your commits in
the usual way.
cumulus$ git pull origin master cumulus$ git push origin master
Files: README WordCount.java mice.txt Makefile WordCountValue.java README WordCountValue2.java
This lab is due by Monday, December 3, 2018.