The map-reduce model
CS 300 (PDC)
The list-oriented functions map
and
reduce
were
map
2 arguments:
func
,
a one-argument procedurelis
,
a list of valid arguments for
func
Return: A list consisting of return values from applying
func
to each element of lis
.
reduce
(commutative case)
2 arguments:
func
,
a commutative function with 0, 1, or 2 arguments
of the same type T that returns type Tlis
,
a list of arguments for func
Return: The function func
is applied to all arguments in
succession, returning a single value of type T
Example:
(define square (lambda (x) (* x x))) (reduce + (map square '(1 2 3 4 5))) --> 55Here, 55 = 12 + 22 + 32 + 42 + 52.
Scheme includes map
as a predefined function.
Here is an implementation of reduce:
(define reduce (lambda (func lis) (cond ((null? lis) (func)) ((null? (cdr lis)) (func (car lis))) (else (func (car lis) (reduce func (cdr lis)))))))
The MPI collective communication call Reduce()
is inspired by LISP's useful reduce
operation.
Although the map
/reduce
programming
strategy has been around for several decades, its application to
data-intensive scalable computing using clusters
emerged more recently, as described in Google's 2004
paper.
Google's paper described their proprietary MapReduce software, now used internally for thousands of large- and extremely large-scale computations on a daily basis.
Rather than create their own proprietary map-reduce tool to
compete with Google, Yahoo! engineers instead turned their energies
towards the Apache Hadoop
project, which is producing an
open-source implementation. Yahoo! remains the primary contributor
towards Hadoop
development.
As of July 2008,
Yahoo! runs Hadoop
jobs on approximately 13,000 nodes,
involving several
petabytes of compressed data; it's largest cluster contains 2000
nodes, and runs Hadoop
jobs continuously.
______
Hadoop
computationExample: Count frequencies of words in a large number of web pages.
Mapper; intermediate list of key-value
pairs; reducer
Parallelizing on a cluster
Locality
Data: HDFS, Hadoop Distributed File System
File system, disk blocks
Replication (3 copies of each block)
Block size, e.g., 64Megabytes
Implemented over native file system
Data nodes hold individual blocks.
The name node (NameNode
daemon) is
responsible for managing the DFS, e.g., determining which files will
be stored on which blocks. The name node also logs all changes to the
DFS, so contents can be reconstructed in case of failure.
Other files systems supported include Amazon's S3 file system, FTP (pseudo file system, all data obtained via FTP), HTTP/HTTPS
Hadoop's job tracker process accepts jobs and schedules them on task trackers that are running near the data (in multi-rack clusters). The job tracker is responsible for orchestrating all the stages of each Hadoop job, including all mappers and reducers for that job.
Task trackers have 4 slots each, for running mappers and reducers
Mapper code, reducer code implemented as members of classes, which are submitted in a Hadoop run.
Configuration file or method to request numbers of slots, combiner, etc.
Assumptions:
Hadoop may be running on a system ranging from a single node (e.g., a single CS Link machine) to a large cluster (e.g., a 2000-node cluster on multiple racks in a data center). Fault tolerance must be planned for the largest scale, which has the most ways for faults to occur.
In a large cluster, no matter how reliable the machines are supposed to be, at least some failures are almost inevitable, but any particular machine is unlikely to fail.
(Thus, for example, Google buys inexpensive PCs with acceptable reliability rather than high-reliability machines, obtaining more nodes, and plans algorithms for some of them to fail.)
In a large, multi-rack cluster, a rack failure may occur that implies failure of all the computers in that rack.
Job characteristics. Hadoop jobs typically take hours to run, and are unlikely to be real-time operations.
For example, Google or Yahoo! would use the map-reduce model to produce indices or databases for speedy response to user searches, and being a few hours late to update an index rarely makes a noticable difference.
For purposes of this discussion, we will say two processes are distant if they are located on different computers and, on large clusters, on different racks. Likewise, nearby means on the same computer, or at least on the same rack.
Machines that are on the same rack typically share power delivery (and UPS), network switches, cooling delivery, etc. These shared services may behave as single points of failure. Some of them also represent performance opportunities. In particular, network bandwidth between machines on the same rack is likely to be better than between machines on different racks.
HDFS.
The name node is a single point of failure. Since there's only one of it, and any one given machine is unlikely to fail, failures of the name node are relatively uncommon, even in a large cluster. However, the failure of the name node could be catastrophic: if the machine merely crashed, recent transactions may be retrievable from the name node's log, but if that log was lost due to a disk failure, the DFS contents may become unusable.
Avoid locating name node on a machine that is exhibiting questionable reliability.
Consider reliable disk storage, e.g., RAID, for name node's log. At the least, make frequent backups of the file system where name node's logs reside.
Hadoop provides for a secondary name node to receive periodic checkpoints from the name node, which can thus be used to recover from failure of a name node. For better fault tolerance, the secondary name node should be distant from the name node. (However, it would be important quickly to repair any network failures between the name node and its secondary name node...)
Triplicate blocks. The 3-fold redundancy among blocks reduces the total capacity of the DFS to one-third of the underlying physical capacity, a heavy cost. The tradeoff is much better fault tolerance.
If one or two nodes containing copies of a block fails, the remaining copies/copy can be used to create new copies on replacement computer(s) computer.
If one of the three copies becomes inconsistent with the other two (i.e., different from them), then that copy can be replaced in the DFS by a new copy of the data shared by the other two.
Note that an inconsistency like this can be an early sign that the computer containing the inconsistent copy is about to fail.
Job tracker. The job tracker is the coordinator of an entire Hadoop job (i.e., one map/shuffle/reduce cycle). It is a single point of failure, and the current implementation of Hadoop does not have a backup plan: your job must be started over after making a new job tracker.
Why less protection for the Hadoop job tracker than for the HDFS name node?
Given the Hadoop job characteristics, it's typically acceptable to simply restart a failed Hadoop job from scratch rather than building elaborate mechanisms to recover them.
However, the contents of the DFS may include results from hundreds of prior Hadoop jobs, some of which may be difficult or impossible to reconstruct. Also, all running jobs will potentially be affected by a failure of the DFS, not just the jobs whose job trackers are on a failed node, because maintaining distance between triplicate copies of blocks for all blocks leads to a tangle of relationships between files and machines on a cluster. Thus, failure of the DFS is more catastrophic than failure of a few Hadoop jobs.
Map stage
Shuffle stage
Reduce stage