Home
>>    




Preparing and using HDFS data sets with WMR

CS 300, Parallel and Distributed Computing (PDC)
Due Tuesday, November 27, 2018

http://www.stolaf.edu/people/rab/cs1/hw/wmr.html#oview
Simple tallies
Overview of map-reduce techniques; context forwarding
Structured values and structured keys.
Tracing map-reduce
Multi-case reducers and mappers.
IN/OUT specifications

Preliminary material

  • ______


  • ______

Laboratory exercises

  1. Create a subdirectory ~/PDC/lab10 on a cluster head node such as cumulus.cs.stolaf.edu for your work on this lab. Manually keep copies of all your mappers and reducers in your lab10 directory. Note that your saved configurations within WMR are not easily accessible by graders, and may disappear if the system needs to be reinstalled or upgraded, so you will have to manually copy/paste mapper and reducer code to ~/PDC/lab10.

  2. Interacting with HDFS. The Hadoop Distributed File System (HDFS) is implemented on top of the Linux file systems on each cluster node. You can interact with the HDFS using the hadoop fs command on the cumulus head node. Here are some example commands. (See http://hadoop.apache.org/docs/r1.1.2/file_system_shell.html for complete documentation.)

    • Directory listing (ls) for a directory hdfs-dir on the HDFS.

      cumulus$  hadoop fs -ls hdfs-dir
      

      Examples:

      • To list the top level directory of example data sets:

        cumulus$  hadoop fs -ls /shared
        

      • To list the top level directory of your own HDFS (vs. Linux) directory (which is presumably empty at this point):

        cumulus$  hadoop fs -ls /user/username
        
        Since your own top-level HDFS directory is your default directory for hadoop fs operations, you may alternatively enter
        cumulus$  hadoop fs -ls 
        

    • Download (copy) a file from the HDFS to your local (Linux) directory on cumulus

      cumulus$  hadoop fs -get hdfs-path path
      
      Here, path is a path or filename on your cumulus directory. Example: To download the Complete Shakespeare file to your directory, naming the copy cs.txt:
      cumulus$  hadoop fs -get /shared/gutenberg/CompleteShakespeare.txt cs.txt
      

    • Upload (copy) a file from your local (Linux) directory on cumulus to your own HDFS directory on the cumulus cluster.

      cumulus$  hadoop fs -put path /user/username/hdfs-path
      
      Example:
      cumulus$  hadoop fs -put cs.txt /user/username
      The default filename on HDFS will be the same as the source file name, so if your username is rab, this will create /user/username .

      Note: HDFS directories /user/username owned by username have been created for you in advance.

    Now, use some of these commands to create a version of a Project Gutenberg book on the HDFS that has that book's ID and a line number in the key, and that line's content in the value, as follows.

    1. Download book 2701 (Moby Dick) from the /shared area on HDFS to your (Linux) lab10 directory on cumulus

      cumulus$  hadoop fs -get /shared/gutenberg/all/group8/2701.txt 2701.txt
      
      Note: A complete list of locally available Project Gutenberg books is available here.

    2. Insert the Project Gutenberg ID number, a colon, the line number, and a tab character at the beginning of each line of this book, to create a new version named 2701.txt_gn (the suffix gn represents "Gutenberg ID/line Numbers")

      You can write a program to accomplish this, or alternatively use an existing tool such as awk:

      cumulus$  awk '{print "2701:" NR "\t" $0}' 2701.txt > 2701.txt_gn
      
      Here,

      • the expression in curly brackets indicates what to do with each line,

      • the name NR evaluates to the current line number (Number of the Record), starting with 1 for the first line,

      • printing the expression "\t" inserts a tab character in the output, which is the separator between key and value fields for Hadoop, and

      • $0 evaluates to the entire current input line in awk. (FYI, $1 evaluates to the first word, $2 to the second word, etc., where the default word delimiter is whitespace.)

      Check that the file 2701.txt_gn was modified correctly, for example, using emacs on cumulus. Here are the first few lines of Chapter 1, which begins on line 516:

      2701:516        CHAPTER 1. Loomings.
      2701:517
      2701:518
      2701:519        Call me Ishmael. Some years ago--never mind how long precisely--having
      2701:520        little or no money in my purse, and nothing particular to interest me on
      2701:521        shore, I thought I would sail about a little and see the watery part of
      2701:522        the world. It is a way I have of driving off the spleen and regulating
      2701:523        the circulation. Whenever I find myself growing grim about the mouth;
      2701:524        whenever it is a damp, drizzly November in my soul; whenever I find
      2701:525        myself involuntarily pausing before coffin warehouses, and bringing up
      

    3. Now, upload your file 2701.txt_gn to your own HDFS directory /user/username

  3. Using .gitignore

    The cumulus files lab10/2701.txt and lab10/2701.txt_gn are worth keeping on the cumulus head node file system for further computation, but there is no need to add these large files to your git repository, especially after uploading 2701.txt_gn to HDFS. You can use a file .gitignore to prevent git from including such files in future commits.

    Git will find your file .gitignore in your ~/PDC directory, which you initialized for git and associated with your remote on stogit. Edit the file ~/PDC/.gitignore to add the following line:

    lab10/2701.*
    
    This line is a pattern in the format used by shell wildcards, and its presence in .gitignore causes all matching files to be ignored in git operations. We could also use
    lab10/2701.txt*
    
    to match our two files, which would be useful if we wanted git to include other 2701.* files (perhaps if you had a file named 2701.info).

  4. Perform the same modifications to five other Project Gutenberg books in English of your choice. Include at least two books written before 1700 and at least two books written after 1850. Place the resulting five files in a new subdirectory /user/username/lab10 of your top-level HDFS directory. (Include 2701.txt_gn as a sixth book if desired.)

    • You can use a combination of the Project Gutenberg site, Wikipedia, and the local list to select books written at the required times. Some authors who wrote in English before 1700 include Shakespeare, Milton, Marlowe, Bacon, and Ben Jonson.

    • You can create your HDFS subdirectory using hadoop fs -mkdir, as described in the HDFS guide. There is also a HDFS operation -cp for copying 2701.txt_gn to your new subdirectory in HDFS, if desired.

    • For five more books, this is easy enough to do by hand, following the pattern for 2701.txt. However, this would be tedious and error prone for hundreds or thousands of books. Alternatively, assuming you choose books available on the cluster, you can create a shell script to perform this work, as follows.

      • Create a text file named insert_gn in your lab10 directory (on cumulus) containing the following:

        #!/bin/sh
        # Example call:   insert_gn groupN/N ...
        
        for arg
        do  gutid=`basename $arg`   # discard the group
            hadoop fs -get /shared/gutenberg/all/$arg.txt $gutid.txt
            awk '{print "'$gutid':" NR "\t" $0}' $gutid.txt > $gutid.txt_gn
            hadoop fs -put $gutid.txt_gn /user/$USER
        done
        

        Here:

        • the first line indicates that this is a shell script, i.e., a file of instructions for the shell program /bin/sh;

        • the for introduces a loop that iterates over all command-line arguments. If you invoke this script as

          cumulus$  ./insert_gn group10/3322 group9/2929
          
          then the body of that for loop will be carried out once assigning group10/3322 to the shell variable arg, then a second time assigning group9/2929 to arg ;

        • the command

              gutid=`basename $arg`   # discard the group
          
          assigns a value to a shell variable gutid, the "backtick operator" (backwards quote `) executes a shell command and inserts its output in that place, and the # starts a comment (Note: there must be no space around the character = in this command);

        • the expressions $arg and $gutid substitute the values of those variables; and

        • the expression $USER substitutes the value of your username.

      • You must make the shell script insert_gn executable in order to use it.

        cumulus$  chmod +x insert_gn
        

      • Note: There are thousands of Project Gutenberg books on our cluster. In a project, you could use the following version of the script in order to avoid create unneeded copies of all the files on the cumulus file system.

        #!/bin/sh
        # Example call:   insert_gn groupN/N ...
        
        for arg
        do  gutid=`basename $arg`   # discard the group
            hadoop fs -cat /shared/gutenberg/all/$arg.txt |
            awk '{print "'$gutid':" NR "\t" $0}' |
            hadoop fs -put - /user/$USER/$gutid.txt_gn
        done
        

        In this script,

        • we use hadoop fs -cat to download a file to standard output,

        • we use a pipeline to direct that file's content to awk, then on to the upload via hadoop fs -put, and

        • the dash argument for hadoop fs -put indicates that the file to upload is provided through standard input.

        When there are many files to process, this version of the script will save file space on cumulus and also some time writing those files (although the HDFS operations will dominate the running time in this script).

    • If you create any temporary files while producing your five new .txt_gn files, you can either take care not to git add and commit them, or add them to ~/PDC/.gitignore, in order to avoid adding them to your git repository.

    If you created a script insert_gn, save your work in a commit.

    cumulus$  git add insert_gn
    cumulus$  git commit -m "lab10: Shell script for adding gutenberg ID and line number as HDFS key"
    

  5. Modify your mapper wc_mapper.ext in the previous lab to obtain the words to count from the function/method's second argument value instead of its first argument key. Using WMR, test your mapper and the reducer wc_reducer.ext (which doesn't need to be changed) with /user/username/2701.txt_gn (on HDFS) as input. Call the revised mapper wcv_mapper.ext .

    Verify that running wcv_mapper and wc_reducer and /user/username/2701.txt_gn produces the same results as running wc_mapper and wc_reducer on the unmodified book /shared/gutenberg/all/group8/2701.txt.

    Notes

    • Add a comment near the top of the file wcv_mapper.ext to indicate that it should be run with wc_reducer.ext. This comment may be less necessary when the prefixes are the same, but it is especially important when prefixes differ for the mapper and reducer files.

    • Viewing a few lines of the output of each WMR run would probably convince you that the outputs are identical. But you may want a more careful comparison of outputs for a project. Here is a way to compare WMR output files character-by-character.

      • The output path for a WMR run (using Hadoop, not Test) is shown at the end of the output, e.g., out/job-179, for a job named job. This is the name of a subdirectory of the wmr account's home directory on HDFS, i.e., /user/wmr/out/job-179/ .

      • Use hadoop fs -ls to list that directory. Assuming you used one reducer (the default for a "small" job), there will be one output file named part-00000 which contains the output from that job, plus a subdirectory _logs of diagnostic information.

        Note: If you use multiple reducers, there will be a part-NNNNN file for each. Hadoop will distribute the key computations among the reducer tasks, so output from some keys will appear in part-00000, others in part-00001, etc.

      • Retrieve the output files from the two runs, then compare them using diff on cumulus. For example, if your two output directories are job-179 and job-180, and each used a single reducer, then you can accomplish this comparison as follows:

        cumulus$  hadoop fs -get /user/wmr/out/job-179/part-00000 job-179
        cumulus$  hadoop fs -get /user/wmr/out/job-180/part-00000 job-180
        cumulus$  diff job-179 job-180
        
        If this diff command returns empty output, there are no differences between the files.

    ALSO DO THIS: Save your work in a commit.

          Manually copy your code wcv_mapper.ext and wc_reducer.ext to your cumulus lab10 directory.

    cumulus$  git add wcv_mapper.ext wc_reducer.ext
    cumulus$  git commit -m "lab10: Word count with text in value instead of key"
    

  6. Basic WMR techniques.

    In the previous lab, we saw a number of basic techniques for computing with WMR.

    • We can tally (e.g., count) entities using reducers as in the word-count examples. Since we write our own reduce operations, we can perform a wide range of analyses on the values associated with a given key, far beyond merely counting frequencies.

    • We can take advantage of Hadoop's sorting during the shuffle phase, which led to our results being automatically sorted by word in the basic word-count programs wc_mapper.ext and wc_reducer.ext.

      Note 1: This sorting effect is an optimization of Hadoop performance that would be difficult to accomplish otherwise through mappers and reducers. (It would not be difficult to sort values within each key, but sorting the keys would take a lot of extra work.)

      Note 2: This overall sorting by key depends on having a single reducer. If there are multiple reducers, the collection of keys for that reducer would be sorted automatically through shuffling, but the output files for two different reducers would need to be merged to get an overall sorted list.

    • We used multiple Hadoop (WMR) cycles to solve problems, e.g., to sort results by frequency count instead of by word.

    • The identity mapper and identity reducer are often helpful. We saw this when sorting by count: we only needed to interchange keys and values at the mapper, then apply the identity reducer in order to achieve sorting.

    • Converting all words to lower case is an example of grouping data values together under a single data value. This can lead to more usable results. For example, in the word-frequency counting example, we may well want the, The, "the, and (the all to count towards the tally for the word the.

    • Pre-tallying, whether using a Hadoop combiner or computing subtotals within a WMR (or Hadoop) mapper.

    In order to describe the requirements for a map-reduce cycle, we will write an IN/OUT spec that describe how a mapper and reducer transform input key-value pairs to output key-value pairs. Here are is an IN/OUT spec for the basic wc_mapper.ext and wc_reducer.ext:

    Mapper for wc_

    # IN keys and values:
    #   key: a line of text   value: none
    # OUT keys and values:
    #   key: one word from that line of text   value: 1
    

    Reducer for wc_

    # IN keys and values:
    #   key: one word from the original text   value: 1
    #   NOTE:  the values are packed into an iterator  iter
    # OUT keys and values:
    #   key: that one word   value: the sum of the 1's from iter
    

    DO THIS: We used an identity reducer in the previous lab to sort by frequency instead of sorting by word: The mapper swapped key and value, and the shuffle stage took care of the sorting, so the reducer needed only replicate the same key-value pairs it received.

    Program an identity mapper id_mapper.ext that produces a pair with the same key and value as its two arguments. (It's a very short operation...) Test your identity mapper and identity reducer together with a file such as 2701.txt_gn that has both keys and values, and verify that the output file is identical to the input file.

    • Note: The identity mapper and reducer and variants can be useful for debugging map-reduce cycles. If you find an issue that doesn't arise with small test data, you can use the identity reducer to examine the intermediate key-value pairs emitted from the mapper stage. The identity mapper could then be paired with the desired reducer to complete the computation, perhaps after modifying the intermediate output somehow for debugging purposes.

    ALSO DO THIS: Save your work in a commit.

          Manually copy your code id_mapper.ext to your cumulus lab10 directory.

    cumulus$  git add id_mapper.ext
    cumulus$  git commit -m "lab10: Identity mapper"
    

  7. Creating an index.

    As a further application of some of these techniques, produce an index of words in Moby Dick.

    1. Instead of determining the frequency of each word as in the word count exercises, we want all the locations of each word. We will use the key in 2701.txt_gn (on HDFS) to identify the locations of words appearing in the text. Therefore, the map-reduce cycle should satisfy this IN/OUT spec:

      Mapper for index_

      # IN keys and values:
      #   key: a location identifier (PG id:line number)   value: a line of text
      # OUT keys and values:
      #   key: one word from that line of text, after lowering case and 
      #     removing any surrounding punctuation
      #   value: that location identifier
      

      Reducer for index_

      # IN keys and values:
      #   key: one word from the original text   value: a location identifier
      # OUT keys and values:
      #   key: that one word   value: comma-separated list of location identifiers
      
    2. Call your mapper and reducer files index_mapper.ext and index_reducer.ext. Start from copies of wclower_mapper.ext and wc_reducer.ext from the previous lab, and edit to satisfy the spec.

    3. Test your index mapper and reduce using small test data and the Test interface. Note: You can enter key-value pairs directly into the input box of a WMR new job page by entering a tab character between the key and the value.

      As an example, the input

      500:1   It was the best of times,
      500:2   it was the worst of times.
      
      should produce an index such as the following:
      best    500:1,
      it      500:1, 500:2,
      of      500:2, 500:1,
      the     500:2, 500:1,
      times   500:1, 500:2,
      was     500:2, 500:1,
      worst   500:2,
      
      You may optionally program to omit the trailing comma in this output.

    4. Save your work in a commit.

            Manually copy your code index_{mapper,reducer}.ext to your cumulus lab10 directory.

      cumulus$  git add index_{mapper,reducer}.ext
      cumulus$  git commit -m "lab10: Index of words appearing in text with line numbers"
      

  8. There is no guarantee that the key-value pairs received by a reducer will be in order of value, since the shuffle stage only sorts by key. Create new versions index_mapper2.ext and index_reducer2.ext that print the location references in sorted order. This will require you to sort values received in the reducer, the only place where all the values can be seen for a given key.

    Note: For this exercise, it is enough to sort the values in lexicographic order, even though that means that 2701:1004 will come bfore 2701:2. If you wish, you can devise a more sophisticated sort that uses numerical ordering after the colon.

    Save your work in a commit.

    cumulus$  git add index_{mapper,reducer}2.ext
    cumulus$  git commit -m "lab10: Index with sorted references"
    

  9. Now, apply your index mapper and reducer to your HDFS subdirectory of five or six books /user/username/lab10.

    You need only give the directory name as the input, because Hadoop acts on all top-level files in an HDFS directory when you specify a directory as input.

    When examining the Hadoop results, verify that index entries arise from multiple files, and are sorted.

Deliverables

All of your code for this assignment should already be contained in commits. Modify the most recent commit message to indicate that you are submitting the completed assignment.

cumulus$  git commit --amend
Add the following to the latest commit message.
    lab10: complete

If your assignment is not complete, indicate your progress instead, e.g.,
    lab10: items 1-5 complete, item 6 partial
You can make later commits to submit updates.

Finally, pull/push your commits in the usual way.

cumulus$  git pull origin master
cumulus$  git push origin master


Files: insert_gn wcv_mapper.ext wc_reducer.ext id_mapper.ext index_mapper.ext index_reducer.ext index_mapper2.ext index_reducer2.ext

Submit your work on this lab using git. Assuming that you did all your work on cumulus, include the name cumulus in your commit message.

%  cd ~/PDC
%  git pull origin master
%  git add -A
%  git commit -m "Lab 23 submit (cumulus)"
%  git push origin master
Likewise, if you worked on more than one system, you can rename your lab10 directories and submit each, with commit messages indicating the computer on which that commit was created.

Also, fill out this form to report on your work.

This lab is due by Tuesday, November 27, 2018.