Hadoop Quick Notes :: Part - 3


Bhaskar S 12/27/2013


Introduction

In Part-2 we started our 3-node Hadoop 1.x cluster and explored the Hadoop Distributed File System (HDFS).

Now, we will explore the Hadoop MapReduce framework.

Hands-on with Hadoop MapReduce 1.x

Hadoop MapReduce is a distributed execution framework that enables one to develop applications which can process large data sets in parallel on a clusters of slave nodes (running on commodity hardware) in a reliable and fault-tolerant way.

The unit of work in Hadoop MapReduce is a job. A job has a map phase and a reduce phase.

During the map phase, the input data read from the HDFS is split into chunks for processing by the map function executing on the slave nodes of the cluster. And, during the reduce phase, the results from map function are fed as input to the reduce function executing on the slave nodes of the cluster. The reduce function consolidates the input into the final result and writes the output to HDFS.

In other words, the map phase performs filtering and sorting, while the reduce phase performs aggregation.

When we learn a new programming language, it is a tradition to first implement and test a simple "Hello World" program. In the case of Hadoop MapReduce, it is the "Word Count" program. The purpose of this simple Hadoop MapReduce job is to counts the different word(s) in a document. The map phase counts the words in each line of the document, while the reduce phase aggregates the per-line counts into word counts across the whole document.

The Hadoop 1.x distribution already comes with a wordcount sample program. The wordcount sample program is part of the hadoop-examples-1.2.1.jar file located in the directory $HADOOP_PREFIX.

In order to see the wordcount Hadoop MapReduce program in action, we need some large text file.

We can download a variety of books in text format from the Project Gutenberg site. For our test, we downloaded the book Christmas in Poetry and called it by the name Book-ChristmasInPoetry.txt.

Let us copy this file to the /data directory in HDFS. To store this file in HDFS, issue the following command:

hadoop fs -put ./Downloads/Book-ChristmasInPoetry.txt /data

This command will not generate an output.

Issue the following command to list all the file(s) under the /data directory of HDFS:

hadoop fs -ls /data

The following will be the output:

Output-1

Found 1 items
-rw-r--r--   2 hadoop supergroup      63009 2013-12-27 17:12 /data/Book-ChristmasInPoetry.txt

Now we are now all set to execute our first Hadoop MapReduce program in our 3-node cluster. Issue the following command to execute the wordcount sample Hadoop MapReduce program:

hadoop jar $HADOOP_PREFIX/hadoop-examples-1.2.1.jar wordcount /data /out

The following will be the output:

Output-2

13/12/27 17:15:54 INFO input.FileInputFormat: Total input paths to process : 1
13/12/27 17:15:54 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/12/27 17:15:54 WARN snappy.LoadSnappy: Snappy native library not loaded
13/12/27 17:15:55 INFO mapred.JobClient: Running job: job_201312271637_0002
13/12/27 17:15:56 INFO mapred.JobClient:  map 0% reduce 0%
13/12/27 17:16:01 INFO mapred.JobClient:  map 100% reduce 0%
13/12/27 17:16:10 INFO mapred.JobClient:  map 100% reduce 33%
13/12/27 17:16:11 INFO mapred.JobClient:  map 100% reduce 100%
13/12/27 17:16:12 INFO mapred.JobClient: Job complete: job_201312271637_0002
13/12/27 17:16:12 INFO mapred.JobClient: Counters: 29
13/12/27 17:16:12 INFO mapred.JobClient:   Job Counters 
13/12/27 17:16:12 INFO mapred.JobClient:     Launched reduce tasks=1
13/12/27 17:16:12 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=5381
13/12/27 17:16:12 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/12/27 17:16:12 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/12/27 17:16:12 INFO mapred.JobClient:     Launched map tasks=1
13/12/27 17:16:12 INFO mapred.JobClient:     Data-local map tasks=1
13/12/27 17:16:12 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=9573
13/12/27 17:16:12 INFO mapred.JobClient:   File Output Format Counters 
13/12/27 17:16:12 INFO mapred.JobClient:     Bytes Written=30043
13/12/27 17:16:12 INFO mapred.JobClient:   FileSystemCounters
13/12/27 17:16:12 INFO mapred.JobClient:     FILE_BYTES_READ=43029
13/12/27 17:16:12 INFO mapred.JobClient:     HDFS_BYTES_READ=63132
13/12/27 17:16:12 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=200963
13/12/27 17:16:12 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30043
13/12/27 17:16:12 INFO mapred.JobClient:   File Input Format Counters 
13/12/27 17:16:12 INFO mapred.JobClient:     Bytes Read=63009
13/12/27 17:16:12 INFO mapred.JobClient:   Map-Reduce Framework
13/12/27 17:16:12 INFO mapred.JobClient:     Map output materialized bytes=43029
13/12/27 17:16:12 INFO mapred.JobClient:     Map input records=2015
13/12/27 17:16:12 INFO mapred.JobClient:     Reduce shuffle bytes=43029
13/12/27 17:16:12 INFO mapred.JobClient:     Spilled Records=6554
13/12/27 17:16:12 INFO mapred.JobClient:     Map output bytes=91362
13/12/27 17:16:12 INFO mapred.JobClient:     Total committed heap usage (bytes)=177016832
13/12/27 17:16:12 INFO mapred.JobClient:     CPU time spent (ms)=970
13/12/27 17:16:12 INFO mapred.JobClient:     Combine input records=9373
13/12/27 17:16:12 INFO mapred.JobClient:     SPLIT_RAW_BYTES=123
13/12/27 17:16:12 INFO mapred.JobClient:     Reduce input records=3277
13/12/27 17:16:12 INFO mapred.JobClient:     Reduce input groups=3277
13/12/27 17:16:12 INFO mapred.JobClient:     Combine output records=3277
13/12/27 17:16:12 INFO mapred.JobClient:     Physical memory (bytes) snapshot=184672256
13/12/27 17:16:12 INFO mapred.JobClient:     Reduce output records=3277
13/12/27 17:16:12 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=771518464
13/12/27 17:16:12 INFO mapred.JobClient:     Map output records=9373

For the wordcount sample Hadoop MapReduce program, we specify as input the HDFS directory where the text file to be processed resides (/data in our case) and in addition we specify the directory where we want the results to be stored in HDFS (/out in our case). Make sure the directory /out is not present in HDFS. The wordcount program will create it for us.

When the wordcount sample Hadoop MapReduce program completes, issue the following command to list all the file(s) under the /out directory of HDFS:

hadoop fs -ls /out

The following will be the output:

Output-3

Found 3 items
-rw-r--r--   2 hadoop supergroup          0 2013-12-27 17:16 /out/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2013-12-27 17:15 /out/_logs
-rw-r--r--   2 hadoop supergroup      30043 2013-12-27 17:16 /out/part-r-00000

From the Output-3, we see a file named _SUCCESS. The presence of this file indicates that the wordcount sample Hadoop MapReduce program completed successfully.

The word count results will be in the file named part-r-00000.

We are not going to display all the contents of this file, rather peek at the tail of the file. Issue the following command to tail the contents of the file /out/part-r-00000 in HDFS:

hadoop fs -tail /out/part-r-00000

The following will be the output:

Output-4

www.gutenberg.org	4
www.gutenberg.org/contact	1
www.gutenberg.org/donate	2
www.gutenberg.org/license.	1
yard,	1
ye	9
ye,	3
year	1
year's	1
year;	2
yearly	1
years,	1
yet	1
yonder	1
you	69
you!)	1
you,	4
you.	1
young	2
your	30
yours	1
yourselves	1
The	1

Lets take a peek at the source code of the wordcount sample Hadoop MapReduce program.

The following is the listing for the Java class WordCount:

Listing-1
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Let us try to understand the source code from Listing-1 above.

For the map phase, we need a map() function, which can be implemented by extending the base class org.apache.hadoop.mapreduce.Mapper.

For the reduce phase, we need a reduce() function, which can be implemented by extending the base class org.apache.hadoop.mapreduce.Reducer.

A Hadoop MapReduce program works on key-value pairs. The input from HDFS is split into key-value pairs (K1, V1), which is processed by the map() function.

The output of the map() function from the different slave nodes in the cluster is then sorted into key-value pairs (K2, List<V2>). Realize that the map() function from the different slave nodes can output a value for the same key K2. Hence, it is a List of values from the map() function from all the slave nodes.

The key-value pairs (K2, List<V2>) from the map() function is then processed by the reduce() function to produce the key-value pairs (K3, V3).

The following Figure-1 illustrates the Hadoop MapReduce process:

MapReduce
Figure-1

Both the Mapper and Reducer are generic classes that takes four type parameters which specify the input key, input value, output key, and output value types. The input types of the reduce() method must match the output types of the map() method.

The key and value objects must be serializable by the Hadoop MapReduce framework. Hence, the key and value objects have to implement the org.apache.hadoop.io.Writable interface.

As can be inferred from the code, the class org.apache.hadoop.io.Text is a serializable implementation for java.lang.String type and the class org.apache.hadoop.io.IntWritable is a serializable implementation for the primitive int type.

The map() method uses the passed in parameter of type org.apache.hadoop.mapreduce.Mapper.Context to write its output.

Similarly, the reduce() method uses the passed in parameter of type org.apache.hadoop.mapreduce.Reducer.Context instance to write its result.

In the map() method, the input is a line of text from the input document. We tokenize the line into words and for each word from the line, output the number 1 as the count.

As can be inferred from the Figure-1 above, the output from the map() method is a word (key K2) and a list of ones (value List<V2>).

In the reduce() method, we sum up all the ones from the input list value and output the word (key K3) and the sum (value V3).

An instance of the class org.apache.hadoop.mapreduce.Job specifies the Hadoop MapReduce job to be run in our 3-node cluster.

In the main() method, we create a Job object and specify the Mapper class (TokenizerMapper) and Reducer class (IntSumReducer).

Notice that the Reducer class (IntSumReducer) has also been specified as the Combiner class. The Combiner is basically an optimization step in the node that runs the Mapper. Realize that the Mapper is outputting a tokenized word (from the input text) and a count of 1. It is possible for the same word to appear multiple times in the input text. The Combiner basically sums up all the same words as an optimization step rather than send all the same words with a count of 1 to the Reducer node and process it there.

Next, we specify the input and output paths.

Finally, we invoke the waitForCompletion() method on the Job object to submit the Hadoop MapReduce job and wait for it to finish. The return value from the waitForCompletion() method is the completion status indicating success (true) or failure (false).

One can get information about the JobTracker by typing in the following URL in a web browser:

http://hadoop-master:50030

The following diagram in Figure-2 shows the result:

JobTracker
Figure-2

One can get information about the completed Hadoop MapReduce jobs by clicking on Job Tracker History as shown in Figure-2 above. The following is the result of clicking the link:

JobTracker History
Figure-3

Clicking on job_201312271637_0002 as shown in Figure-3 above, we see the following:

Hadoop Job
Figure-4

References

Hadoop Quick Notes :: Part - 1

Hadoop Quick Notes :: Part - 2