Home
>>    




Introduction to Hadoop (Revised 2)

CS 300, Parallel and Distributed Computing (PDC)
Due Monday, December 3, 2018

NEW:

  • explain about multiple output files

  • ______

  • ______

  • ______

  • ______

  • ______

  • ______

  • ______

  • ______

  • ______

  • ____________
______

Introduction to Hadoop

  • C++, Java versions

  • Word count

  • index

Preliminary material

  • Map-reduce model

  • Hadoop implementation.

  • HDFS, distributed file system

  • ______


  • Mappers, Reducers, and types.

  • Input formats; KeyValueTextInputFormat

  • Job configuration with JobConf; with command-line options

  • Identity mappers and identity reducers

Laboratory exercises

Note: Local reference for running Hadoop jobs is the Hadoop Reference, January 2011 page on the Beowiki. (It needs updating as of 2015.)

  1. Create directories

    • Create shell windows on a link machine and on one of the cluster head nodes, such as cumulus.

      Note: For convenient editing, you can enable X forwarding for these SSH sessions as follows:

      link%  ssh -X cumulus.cs.stolaf.edu
      
      (From a Mac, replace -X by -Y.)
      With X forwarding, a new X11 window will pop up on your local computer when you enter an X11 command such as emacs.

    • On a head node, create ~/PDC/lab11. Change directory to lab11.

  2. Example of Hadoop code, WordCount.java.

    We will start with a discussion of the source file, WordCount.java

    • Java's basic syntax is similar to C or C++. For example, the characters // in the first lines start an in-line comment, and /* ... */ can also be used for comments; also, the syntax rules for Java for, while, if, etc., are similar to those for C or C++.

    • The package command in Java provides a naming context for the classes defined in the file WordCount.java. Next, several import commands provide names of classes used later in the file. Some of these import commands draw from standard Java code libraries, and some are specific to Hadoop.

      The import commands seem comparable to #include in C or C++, but they're actually more like using namespace. For example,

      import org.apache.hadoop.conf.*;
      
      enables us to write
      JobConf conf = new JobConf(WordCount.class);
      
      in the first line of main(), instead of
      org.apache.hadoop.conf.JobConf conf = new org.apache.hadoop.conf.JobConf(WordCount.class);
      

    • The remainder of the file defines a single class WordCount with one class method main(), and two nested classes Map and Reduce. As with C++, one can define one class inside of another class; a nested class is only visibile within the scope of its containing class.

      The next items delve into main() and these two inner classes.

    main() method

      public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
    
        conf.setMapperClass(Map.class);
        //conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);
    
        conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
    
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        
        JobClient.runJob(conf);
      }
    
    • The keyword static indicates that main() and the two classes have class scope, so Hadoop can use them without constructing a WordCount object.

    • The method main() corresponds to the main() function of a C++ program. (Incidentally, in Java, there are no standalone functions, but only methods of classes.) The WordCount class's main() sets up configuration parameters for a Hadoop job, then calls JobClient.runJob() to launch that Hadoop job.

      The throws qualifier for main() is about error handling. If a call in the body of main() encounters an error (e.g., a non-existant class that it is expecting), that call will generate an object of class Exception for that error, then will forward that object to the Java runtime environment using a mechanism called throwing that Exception object.

      The Exception object can contain error types and messages, relevant values, and other context information.
    • The configuration information is packed into a JobConf object called conf. Observe that Java uses new for dynamic allocation, like C++.

      In Java, classes cannot contain objects per se, but only references to objects (which are like pointers). Thus, unlike C++, if we omit the initialization = new JobConf(...), no default constructor would be called. Instead, only a single memory location for holding the reference would be allocated, awaiting a value from a call to new to allocate that object.

    • Arguments that end with .class refer to compiled class definitions in Java. For example, Text.class refers to a predefined class named Text that is part of the Hadoop distribution (roughly representing strings of characters). The argument WordCount.class for constructor call for JobConf refers to the result of compiling the WordCount class we are examining.

      As an example, the line

      conf.setInputFormat(KeyValueTextInputFormat.class);
      
      sets the InputFormat configuration parameter. Numerous InputFormat classes are predefined and available in Hadoop, and it's possible to create your own. We chose KeyValueTextInputFormat.class because WMR uses that input format.

    • The expressions arg[0] and arg[1] refer to command-line arguments for main(). These two command-line arguments are the names of the input and output directories or files for this Hadoop word-count computation.

    Map class

      public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
    
        public void map(Text key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
          String line = key.toString();
          StringTokenizer tokenizer = new StringTokenizer(line);
          while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            output.collect(word, one);
          }
        }
      }
    
    • The mapper in this Hadoop map-reduce computation will be the method map() of the class named Map that is defined within the WordCount class.

    • When defining the Map class, the code

      extends MapReduceBase
      
      indicates that Map is a subclass of another class MapReduceBase. Also, the code
      implements Mapper<Text, Text, Text, IntWritable>
      
      means that Map includes the methods specified in the Java Interface named Mapper. The angle brackets <...> provide type names, just like a template in C++, but it is called a generic in Java.

      The four type values for a Mapper<> indicate the key and value types for the mapper input, then the output key and value types for the mapper.

    • As we know from WebMapReduce, Hadoop uses strings to transmit data values in mappers and reducers. This means, for example, that an integer data value such as 1 must be turned into a string "1" in order to be emitted (i.e., collected by the output, in Hadoop terminology). Hadoop's Writable classes perform the conversions between data values of various types and strings. For example, the IntWritable object one defined as the first state variable in the Map class is essentially a string representation "1" for the integer value 1. In the word count algorithm, we will emit one as the value.

    • The map() function has four arguments:

      • the incoming key for a mapper call (a string of Hadoop type Text) holds a line of the source text in the case of our data file mice.txt;

      • the incoming value for a mapper call (also Hadoop type Text) will be empty for our example file;

      • an OutputCollector<> object output for receiving emits from that call to the mapper; and

      • a Reporter object for reporting on the status and any errors during that call to the mapper.

      The map() method proceeds to
      1. convert the key from a Hadoop Text object to a Java String object,

      2. split that String into words stored in a Java StringTokenizer object (words separated by whitespace by default), then

      3. enter a loop on those words that converts the next word tokenizer.nextToken() from Java String to a Hadoop Text object, then emits that Text object as key and one as value. Note: Emitting in Hadoop is accomplished in a call of output.collect().

    Reduce class

      public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) 
          throws IOException {
          int sum = 0;
          while (values.hasNext()) {
            sum += values.next().get();
          }
    
          output.collect(key, new IntWritable(sum));
        }
      }
    
    • The reducer specified by class Reduce has input keys of type Text, input values of type IntWritable, output keys of type Text, and output values of type IntWritable. For the word count algorithm, the input key represents a word, and each input value always represents the value 1; the output key is that word, and the output value will be the sum of the 1's for that word.

    • The reduce() method itself has four arguments, very simlar to the Map.map() method, except that the second argument of reduce() is a Iterator of IntWritable values, instead of a single Text value. As with WMR, an iterator is used because there may be too many (or too large) values to hold them all at once in main memory.

  3. Compiling and running a Hadoop job

    1. First, upload your data to a directory in the HDFS. For this example, you can create an HDFS directory /user/username/mice on the cluster you are using, and upload a test file mice.txt to that HDFS directory containing the following lines.

      Three blind mice, three blind mice!
      See how they run, see how they run!
      They all ran after the farmer's wife,
      She cut off their tails with a carving knife.
      Did you ever see such a thing in your life
      As three blind mice?
      
      Note: Hadoop uses your HDFS home directory as a default directory. Thus, you can accomplish the HDFS operations above with
      cumulus$  hadoop fs -mkdir mice
      cumulus$  hadoop fs -put mice.txt mice
      
      omitting the absolute path prefixes /user/username/

    2. Compile and run the Java code by hand.

      Copy WordCount.java to your lab11 directory on your cluster's head node (e.g., cumulus), perhaps cutting and pasting into an emacs running on cumulus. Then, issue the following commands to compile that file.

      cumulus$  cd ~/PDC/lab11 
      cumulus$  mkdir classes
      cumulus$  javac -classpath /usr/lib/hadoop/hadoop-0.20.1-core.jar -d classes WordCount.java
      cumulus$  jar cvf WordCount.jar -C classes edu
      cumulus$  hadoop jar WordCount.jar edu.stolaf.cs.WordCount mice wc-out1
      
      Notes:

      • The subdirectory classes is a destination for the .class files created during compilation. After compilation, the directory ~/PDC/lab11/classes/edu/stolaf/cs/ will hold the following files:

        WordCount$Map.class  WordCount$Reduce.class  WordCount.class
        
        The subdirectory structure is determined by the package instruction in the Java source file WordCount.java.

      • The javac command performs the actual compilation. The flag -classpath /usr/lib/hadoop/hadoop-0.20.1-core.jar informs the Java compiler where to find the precompiled Hadoop libraries.

        Notes:

        • On mist, use -classpath /usr/lib/hadoop/hadoop-core-1.1.2.jar .

        • On cumulus, the javac compilation will produce deprecation warnings, which can be ignored for this course.

      • The jar command packages up the contents of the subdirectory classes into a single file, called a "jar" (Java archive).

      • Finally, we launch the Hadoop job with the hadoop jar command. The command-line arguments are

        • the jar file that contains the compiled code,

        • the class in that jar that contains main(),

        • the HDFS path to the input, and

        • an HDFS path for a new directory to receive the output from this Hadoop run.
          Note: Hadoop creates this directory, which must not already exist.

        Since the third and fourth arguments are subdirectories of your HDFS directory /user/username/, we can omit that prefix and specify the input and output directories simply as mice and wc-out1.
      • Hadoop produces a considerable amount of output on the command line about the progress and completion of the job.

      • After the job has finished running successfully, the output directory wc-out1 (i.e., /user/username/wc-out1, for your username) on HDFS should contain the results in a file part-00000 (assuming there is a single reduce process).

      To examine that output, first list the files in the HDFS directory.

      cumulus$   hadoop fs -ls wc-out1
      cumulus$   hadoop fs -cat wc-out1/part-00000 | more
      
      The first of these commands should display two HDFS entries, a subdirectory _logs (which records information about the job) and the output file /user/username/wc-out1/part-00000. The second command prints the output file on sthadard output, one page at a time (enter a space to see the next page).

      ALSO DO THIS: Record the commands you used for this step in a file lab11/README. Label this documentation as performing a by-hand submission of a Hadoop job.

      Save your work in a commit.

      cumulus$  git add README WordCount.java mice.txt
      cumulus$  git commit -m "lab11: First Hadoop job, submitted at the command line.  WordCount.java"
      

    3. Using the St. Olaf "Hadoop Makefile" to compile and run a Hadoop job.

      The commands for compiling and running a Hadoop job are somewhat tedious and error prone. Not only must you remember to enter all the commands in the correct order, but you need to create a new output directory on HDFS with each run.

      Therefore, we have developed a local Hadoop Makefile to compile and run Hadoop jobs, then view the results.

      • First, copy the Hadoop Makefile to your lab11 directory on cumulus.

        Note: If you are on mist, delete the # at the beginning of this line in the Makefile:

        # HADOOP_JAR = /user/lib/hadoop/hadoop-core-1.1.2.jar  # mist 1/2015
        

      • Now perform the command

        cumulus$  touch WordCount.java
        
        to change the modification date for that file (without changing the content of the file), and enter
        cumulus$  make WordCount.hadoop DATA=mice
        
        The .hadoop at the end of this make target requests make to compile the job with javac, create WordCount.jar, then launch a Hadoop job using that .jar file and a fresh output directory name on the HDFS. Note: The command-line argument DATA=mice specifies the input directory /user/username/mice on HDFS, and the Makefile chooses a fresh output directory within an HDFS directory /user/username/out/.

      • To examine the output from a successful run, enter

        cumulus$  make print | more
        
        The Hadoop Makefile keeps track of the most recent output directory name and uses hadoop fs -cat to display the results on standard output.

      Add instructions for using the Hadoop Makefile to your README file, then save your work in a commit.

      cumulus$  git add README Makefile
      cumulus$  git commit -m "lab11: Running Hadoop job with the St. Olaf Hadoop Makefile"
      

    4. Features of the Hadoop Makefile

      • The Hadoop Makefile records information about the most recent job. The make print example above indicates one aspect of this. These are not general features of make, but instead are extra capabilities implemented in this particular Makefile.

      • The Hadoop Makefile uses that recorded information to launch the next job more conveniently, too.

        To see this, carry out these steps:

        1. Make a small but noticeable change in WordCount.java, namely, initializing the value of the IntWritable state variable one to 2 instead of 1.

          private final static IntWritable one = new IntWritable(2);
          
          This should double all the word frequencies produced by the program. Check this by entering
          cumulus$  make hadoop
          cumulus$  make print
          
          The invocation make hadoop should recompile and rerun your WordCount job, the last .hadoop target entered, starting with the edited .java code. The make print command should display the output, showing that all word counts are doubled

        2. Be sure to change the definition of one back to 1 in WordCount.java, so that code will be correct again.

      Here is a list of features of the Hadoop Makefile

      • Option to specify data set for a Hadoop job using command-line argument DATA=dir. Here, dir is automatically relative to /user/username unless dir is an absolute path (starting with /).

      • Generates a new HDFS output directory name for each Hadoop run, based on the name of the class (e.g., WordCount. Output directories from Hadoop runs are stored by default in an HDFS subdirectory /user/username/out.

      • Option to specify a desired output directory on the HDFS with command-line argument OUTDIR=dir, where dir is interpreted relative to /user/username if it is not absolute (as with DATA). Note: dir is not created in the out subdirectory, unless you include out/ at the beginning of dir.

      • The output subdirectory name out can be changed by a command-line argument OUTPARENT=dir.

      • Saves the class name for the most recent ClassName.hadoop target. The next target hadoop becomes equivalent to ClassName.hadoop .

      • Saves DATA, job ID number (used in output directory name), and output directory name for the most recent run of each program ClassName.java. This is retrieved by hadoop and print targets.

      • Saves the OUTPARENT value from the most recent Hadoop run (not per ClassName).

      Examples:

      ______
  4. Note on using Eclipse with Hadoop (optional)

    Note: Please do not run Eclipse on head nodes, since it takes so many resources. If you are familiar with Eclipse and want to edit Java with it, be sure to enter the eclipse command on a Link machine, and use scp to transfer the resulting java files to a cluster head node.

Some first Hadoop programming exercises

  1. Create a copy WordCountValue.java of WordCount.java, and edit that copy to count word frequencies when the text is in the value instead of the key.

    1. Create a file mice2.txt as test input for this Hadoop program, containing the following:

      mice:1	Three blind mice, three blind mice!
      mice:2	See how they run, see how they run!
      mice:3	They all ran after the farmer's wife,
      mice:4	She cut off their tails with a carving knife.
      mice:5	Did you ever see such a thing in your life
      mice:6	As three blind mice?
      
      Create a new HDFS directory mice2 (a subdirectory of /user/username and upload your file mice2.txt to it.

    2. The file WordCountValue.java needs these changes.

      • In Java, the name of the file indicates the name of a class defined in that file. So, change the name of the (outer) class to WordCountValue instead of WordCount.

      • Also change the constructor argument for JobConf() object from WordCount.class to WordCountValue.class, and modify the job name configuration setting.

      • Modify the mapper to split the value instead of the key. (See the discussion of WordCount.java above if you're having trouble figuring out what to change.)

    3. To compile and run your new Hadoop job, enter

      cumulus$  make WordCountValue.hadoop DATA=mice2
      
      If you need to make corrections, you can try again using
      cumulus$  make hadoop
      
      since the name WordCountValue will now be the Hadoop Makefile's remembered name. Finally, enter
      cumulus$  make print
      
      to see the output, which should be identical to the output from the original (key-splitting) WordCount.java with DATA=mice.

    4. Carry out these steps to be certain that the two programs are producing identical data.

      • Save the HDFS output from the WordCountValue job to a file in your cumulus lab11 directory.

        cumulus$  make print > wcv.out
        

      • Now, re-run the WordCount job

        cumulus$  make WordCount.hadoop
        

      • Now, you can use the Linux utility diff to compare the output from each job character by character:

        cumulus$  make print | diff - wcv.out
        1c1
        < hadoop fs -cat out/WordCount.29348/part-00000
        ---
        > hadoop fs -cat out/WordCountValue.26066/part-00000
        
        This output from diff indicates that all the key-value pairs were identical, and only the hadoop fs commands reported by make differed between the two runs. (The numbers 23948 and 26066 are job numbers assigned by the Hadoop Makefile, and they would differ even if we compared two runs of the same .java program.)

    5. Save your work in a commit.

      cumulus$  git add WordCountValue.java
      cumulus$  git commit -m "lab11: Word count with text in the value field of input"
      

  2. Hadoop performance and scalability.

    Try timing some runs of an already-compiled Hadoop program.

    Since WordCountValue.jar already exists, the Hadoop Makefile won't rebuild it if we launch more WordCountValue jobs without modifying the source file WordCountValue.java

    So, it's reasonable to compare approximate performance for Hadoop runs of that .jar using time. Try the following, and record your timing results.

    cumulus$  time make WordCountValue.hadoop DATA=mice2
    cumulus$  time make hadoop DATA=lab10
    
    The Hadoop Makefile will reuse WordCountValue.jar in both cases. Record timing results and make observations in a file ~/PDC/lab11/README.

    Notes:

    • In one sample run on cumulus in January 2015, the timing for DATA=mice2 was

      real    0m22.900s
      user    0m0.921s
      sys     0m0.078s
      
      which indicates that local processing to launch the Hadoop job was about 1 second, even though the elapsed time was about 23 seconds. Thus, the bulk of the time was taken up blocking for Hadoop services on the cluster to do their work. There are only 220 bytes in mice2.txt, but Hadoop incurs lots of overhead preparing for potentially terascale or larger processing.

    • In another run with about 20,000 bytes of data, the timings were

      real    0m24.899s
      user    0m0.949s
      sys     0m0.078s
      
      Thus, the local user+sys time was only about 0.028 sec longer and the total elapsed real time was about 2 sec longer, which further indicates that most of the time is overhead, and little is needed to actually process such small data.

    • In a third run with DATA=/shared/gutenberg/all_n/group11 (294 Project Gutenberg books with line numbers in the key field, totalling about 125MB), the timings were

      real    1m2.239s
      user    0m1.275s
      sys     0m0.104s
      
      It took about 2.5 times as many seconds to process over 6000 times as much data. Hadoop is certainly designed for scaling well as the data size increases!

      Since cumulus has only 13 older quad-core nodes running on gigabit ethernet at the time of these tests, this scalability probably won't continue for long as the data size increases (e.g., it would probably take more than 3 minutes to process a terabyte of data, which is about 6000 times the size of group11). But given thousands of nodes for a cluster with better hardware and networking, it's quite believable that Hadoop can compute at petascale with acceptable performance.

    ALSO DO THIS: Record your observations in a commit.

    cumulus$  git add README
    cumulus$  git commit -m "lab11: Timing results for WordCountValue"
    

  3. Lowering case and removing punctuation

    The Java String class has useful methods for operating on strings, including

    • toLowerCase(),

    • substring(), and

    • contains(), which can be used to test if a string of punctuation contains a particular character.

    Here is a test program Try.java that demonstrates how these methods can be used to check whether the first characters of some words (entered as command-line arguments) are vowels:

    public class Try {
        public static void main(String args[]) {
            int i;
            for (i = 0;  i < args.length;  i++)
                if ("aeiou".contains(args[i].toLowerCase().substring(0,1)))
                    System.out.println(args[i]);
        }
    }
    
    Notes:

    • No import statements are needed to use the name String.

    • The command-line arguments for main() are provided as an array of String, which we are calling args[] (but which could be called something else).

    • Unlike C++, Java does not accept declarations within the initialization of a for loop, as in for (int i = ...).

    • Here, we call the contains() method of the String "aeiou".

    • The charAt() method of String is the usual way to examine a character, but as the String documentation states, contains() expects two String arguments. It was simpler to use substring to retrieve the first character than to convert the char value returned by charAt() to a String.

    • System.out.println(...) is the Java counterpart of C++'s cout << ... << endl .

    • To compile and run this test program, enter

      cumulus$   javac Try.java
      cumulus$   java Try Apple banana Mango orange
      apple
      orange
      
      The two lines of output for the four command-line arguments are shown

    DO THIS: Apply these methods to create a Hadoop program WordCountValue2.java that counts frequencies of words after lowering the case and removing punctuation before and after the letters.

    • Hint: You can use a while loop to find the index of the first non-punctuation character at the beginning of a string, and a second while loop to find the index of the last non-punctuation character at the end of the string.

    • Once you know those two indices, you can use substring() to obtain a word with punctuation removed.

    Test your program WordCountValue2.java on DATA=mice2, and determine whether it counted accurately by comparing against wcv.out. For example, the frequency of the word three for WordCountValue2.java should equal the sum of frequencies of the words Three and three in wcv.out .

    ALSO DO THIS: Record your work in a commit.

    cumulus$  git add WordCountValue2.java
    cumulus$  git commit -m "lab11: WordCountValue ignoring case and punctuation"
    

  4. OPTIONAL: Index computation

    To compute an index with Hadoop directly, recall the IN/OUT spec for index_ in a prior WMR lab. ______

  5. OPTIONAL: Combiner. Observe that our code for main() includes the instruction

    //conf.setCombinerClass(Reduce.class);
    
    which suggests that these computations can make use of true Hadoop combiners. Thus, the key-value pairs sent to the reducer shouldn't all have value 1 in general. For mice2.txt, words such as blind and how should appear with value 2 emerging from the combiner. Verify this by removing // from the beginning of that line and showing the sequence of values received in each call of the reducer.

    • Make a copy CombinerTest.java of MapReduceValue.java, and change the class name, job name, and JobConf() constructor parameter accordingly. (Consider a global substitution in your editor.) Also, remove // from the beginning of the line with a conf.setCombinerClass() call.

    • Copy the inner class Reduce, and change the copy's class name to Combine. (The Combine class should still implement Reducer<>, and have a method reduce(), etc. -- only one name needs to be changed). Also, set the class Combine.class as the combiner instead of Reduce.class

      conf.setCombinerClass(Combine.class);
      
      Compile and run CombinerTest.java with DATA=mice2. Compare to wcv.out to verify that it produces the same output as a WordCountValue job with that input data.

    • Now, change just the Reduce class to accumulate the incoming values from the iterator, and append them to the frequency when emitting.

      • The iterator delivers Hadoop IntWritable objects, not Java Strings, so we must convert from String. ______

      • ______

      • ______

      • ______

      • ______

      • ______

      • ______

      • ______

      • ______

      • ____________
      ______
    • ______

    • ______

    • ______

    • ______

    • ______

    • ____________
    ______

Deliverables

All of your code for this assignment should already be contained in commits. Modify the most recent commit message to indicate that you are submitting the completed assignment.

cumulus$  git commit --amend
Add the following to the latest commit message.
    lab11: complete

If your assignment is not complete, indicate your progress instead, e.g.,
    lab11: items 1-5 complete, item 6 partial
You can make later commits to submit updates.

Finally, pull/push your commits in the usual way.

cumulus$  git pull origin master
cumulus$  git push origin master


Files: README WordCount.java mice.txt Makefile WordCountValue.java README WordCountValue2.java

This lab is due by Monday, December 3, 2018.