Hadoop Quick Notes :: Part - 4


Bhaskar S 12/30/2013


Introduction

In Part-3, we explored the Hadoop MapReduce framework by executing the wordcount example (that is bundled with the Hadoop 1.x distribution in the hadoop-examples-1.2.1.jar file).

Also, we took a peek at the source code of wordcount to get an understanding.

Now we will continue from where we left off and implement our own Hadoop MapReduce program.

Hands-on with Hadoop MapReduce 1.x - Part 2

Let us implement our own version of the word count program called the EnhancedWordCount. Our implementation will be a clearner version (we will strip out punctuation characters and convert all words to a lowercase) and will count only words that are 5 letters or more.

The first step is to implement our custom org.apache.hadoop.mapreduce.Mapper.

The following is the Java class called com.polarsparc.hadoop.hmr.EnhancedWordCountMapper that implements our Mapper:

Listing-1
package com.polarsparc.hadoop.hmr;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/* Enhance word count mapper that will count only words of size 5 letters or more */

public class EnhancedWordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static int MIN_WORD_SZ = 5;
    private final static String TOKENIZER = "\"()[]!.,;:-/ ";
    private final static IntWritable ONE = new IntWritable(1);
    
    private Text word = new Text();
    
    @Override
    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
        StringTokenizer tokens = new StringTokenizer(value.toString(), TOKENIZER);
        while (tokens.hasMoreTokens()) {
            String str = tokens.nextToken();
            if (str.length() >= MIN_WORD_SZ) {
                word.set(str.toLowerCase());
                context.write(word, ONE);
            }
        }
    }
}

As can be inferred from the Listing-1 above, we tokenize using the common punctuation characters, check for the word to be at least 5 letters long, convert the identified word to a lowercase and write it out with a count of 1.

The second step is to implement our custom org.apache.hadoop.mapreduce.Reducer.

The following is the Java class called com.polarsparc.hadoop.hmr.EnhancedWordCountReducer that implements our Reducer:

Listing-2
package com.polarsparc.hadoop.hmr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/* Enhance word count reducer that will count only words of size 5 letters or more */

public class EnhancedWordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable val : values) {
            count += val.get();
        }
        result.set(count);
        context.write(key, result);
    }
}

As can be inferred from the Listing-2 above, we sum up all the 1's from the input value list and output the word and its corresponding count.

The final step is to implement the main Hadoop MapReduce job.

The following is the main Java class called com.polarsparc.hadoop.hmr.EnhancedWordCount:

Listing-3
package com.polarsparc.hadoop.hmr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class EnhancedWordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: EnhancedWordCount <in> <out>");
          System.exit(2);
        }
        
        Job job = new Job(conf, "EnhancedWordCount");
        job.setJarByClass(EnhancedWordCount.class);
        job.setMapperClass(EnhancedWordCountMapper.class);
        job.setCombinerClass(EnhancedWordCountReducer.class);
        job.setReducerClass(EnhancedWordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

As can be inferred from the Listing-3 above, in the main() function, we create a Job object and specify the Mapper class (EnhancedWordCountMapper) and Reducer class (EnhancedWordCountReducer). Next, we specify the input and output paths. And finally, we invoke the waitForCompletion() method on the Job object to submit the Hadoop MapReduce job and wait for it to finish.

Hooray !!! we have implemented our first Hadoop MapReduce program.

Wait a minute. How do we know what we have implemented will work ???

How does one unit test this ???

Despair Not. Enter MRUnit.

MRUnit is a unit testing framework that is based on JUnit and can be used to unit test Hadoop MapReduce programs.

We will leverage MRUnit to test our EnhancedWordCount program.

Download the latest distribution of MRUnit from Apache MRUnit website and use it in the project.

The MRUnit framework includes a driver for unit testing just the Mapper, another driver for unit testing just the Reducer, yet another driver for unit testing the MapReduce together.

The following is the Java class called com.polarsparc.hadoop.hmr.test.EnhancedWordCountUnitTests that demostrates the use of MRUnit unit testing framework:

Listing-4
package com.polarsparc.hadoop.hmr.test;

import java.io.IOException;
import java.util.List;
import java.util.ArrayList;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;

import org.junit.Before;
import org.junit.Test;
import org.junit.Assert;

import com.polarsparc.hadoop.hmr.EnhancedWordCountMapper;
import com.polarsparc.hadoop.hmr.EnhancedWordCountReducer;

public class EnhancedWordCountUnitTests {
    private final static IntWritable ONE = new IntWritable(1);
    private final static IntWritable TWO = new IntWritable(2);
    
    // The generic types have to match the ones specified in the Mapper
    private MapDriver<Object, Text, Text, IntWritable> mapDriver;
    
    // The generic types have to match the ones specified in the Reducer
    private ReduceDriver<Text,IntWritable,Text,IntWritable> reduceDriver;
    
    // Must specify six generic types - input and output of the Mapper and
    // the output of the Reducer
    private MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;
    
    @Before
    public void setup() {
        EnhancedWordCountMapper mapper = new EnhancedWordCountMapper();
        EnhancedWordCountReducer reducer = new EnhancedWordCountReducer();
        
        mapDriver = MapDriver.newMapDriver(mapper);
        reduceDriver = ReduceDriver.newReduceDriver(reducer);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
    }
    
    @Test
    public void testEnhancedWordCountMapper()
        throws IOException {
        // Input two lines as input
        mapDriver.withInput(new Text(), new Text("Things do not happen."));
        mapDriver.withInput(new Text(), new Text("Things are made to happen."));
        
        // Output the word count in the exact order as seen from the input
        mapDriver.withOutput(new Text("things"), new IntWritable(1));
        mapDriver.withOutput(new Text("happen"), new IntWritable(1));
        mapDriver.withOutput(new Text("things"), new IntWritable(1));
        mapDriver.withOutput(new Text("happen"), new IntWritable(1));
        
        mapDriver.runTest();
    }
    
    @Test
    public void assertEnhancedWordCountMapper()
        throws IOException {
        // Input two lines as input
        mapDriver.withInput(new Text(), new Text("Things do not happen."));
        mapDriver.withInput(new Text(), new Text("Things are made to happen."));
        
        List<Pair<Text, IntWritable>> output = mapDriver.run();
        
        // We should see four outputs from the map function
        Assert.assertEquals(4, output.size());
        
        // The first output from map should be the word 'things' with count 1
        Assert.assertEquals(new Text("things"), output.get(0).getFirst());
        Assert.assertEquals(ONE, output.get(0).getSecond());
        
        // The last output from map should be the word 'things' with count 1
        Assert.assertEquals(new Text("happen"), output.get(3).getFirst());
        Assert.assertEquals(ONE, output.get(3).getSecond());
    }
    
    @Test
    public void testEnhancedWordCountReducer()
        throws IOException {
        // This is for the word things
        List<IntWritable> thingsList = new ArrayList<IntWritable>();
        thingsList.add(new IntWritable(1));
        thingsList.add(new IntWritable(1));
        
        // This is for the word happen
        List<IntWritable> happenList = new ArrayList<IntWritable>();
        happenList.add(new IntWritable(1));
        happenList.add(new IntWritable(1));
        
        // Input the word and the values list. Keys must be sorted
        reduceDriver.withInput(new Text("happen"), happenList);
        reduceDriver.withInput(new Text("things"), thingsList);
        
        // Output the word and the count. Keys must be sorted
        // Maintain the same order of keys as in the input
        reduceDriver.withOutput(new Text("happen"), TWO);
        reduceDriver.withOutput(new Text("things"), TWO);
        
        reduceDriver.runTest();
    }
    
    @Test
    public void assertEnhancedWordCountReducer()
        throws IOException {
        // This is for the word things
        List<IntWritable> thingsList = new ArrayList<IntWritable>();
        thingsList.add(new IntWritable(1));
        thingsList.add(new IntWritable(1));
        
        // This is for the word happen
        List<IntWritable> happenList = new ArrayList<IntWritable>();
        happenList.add(new IntWritable(1));
        happenList.add(new IntWritable(1));
        
        // Input the word and the values list. Keys must be sorted
        reduceDriver.withInput(new Text("happen"), happenList);
        reduceDriver.withInput(new Text("things"), thingsList);
        
        List<Pair<Text, IntWritable>> output = reduceDriver.run();
        
        // We should see two outputs from the reduce function
        Assert.assertEquals(2, output.size());
        
        // The first output from reduce should be the word 'happen' with count 2
        Assert.assertEquals(new Text("happen"), output.get(0).getFirst());
        Assert.assertEquals(TWO, output.get(0).getSecond());
        
        // The last output from map should be the word 'things' with count 2
        Assert.assertEquals(new Text("things"), output.get(1).getFirst());
        Assert.assertEquals(TWO, output.get(1).getSecond());
    }
    
    @Test
    public void testEnhancedWordCountMapReduce()
        throws IOException {
        // Input two lines as input
        mapReduceDriver.withInput(new Text(), new Text("Things do not happen."));
        mapReduceDriver.withInput(new Text(), new Text("Things are made to happen."));
        
        // Output the word and the count. Keys must be in sorted order
        mapReduceDriver.withOutput(new Text("happen"), TWO);
        mapReduceDriver.withOutput(new Text("things"), TWO);
        
        mapReduceDriver.runTest();
    }
    
    @Test
    public void assertEnhancedWordCountMapReduce()
        throws IOException {
        // Input two lines to the map function
        mapReduceDriver.withInput(new Text(), new Text("Things do not happen."));
        mapReduceDriver.withInput(new Text(), new Text("Things are made to happen."));
        
        List<Pair<Text, IntWritable>> output = mapReduceDriver.run();
        
        // We should see two outputs from the reduce function
        Assert.assertEquals(2, output.size());
        
        // The first output from reduce should be the word 'happen' with count 2
        Assert.assertEquals(new Text("happen"), output.get(0).getFirst());
        Assert.assertEquals(TWO, output.get(0).getSecond());
        
        // The last output from map should be the word 'things' with count 2
        Assert.assertEquals(new Text("things"), output.get(1).getFirst());
        Assert.assertEquals(TWO, output.get(1).getSecond());
    }
}

Let us try to understand the source code from Listing-4 above.

The Java class org.apache.hadoop.mrunit.mapreduce.MapDriver is the driver to unit test the Mapper.

The Java class org.apache.hadoop.mrunit.mapreduce.ReduceDriver is the driver to unit test the Reducer.

The Java class org.apache.hadoop.mrunit.mapreduce.MapReduceDriver is the driver to unit test both the Mapper and Reducer together.

In the method setup(), we first create an instance of our Mapper EnhancedWordCountMapper and an instance of our Reducer EnhancedWordCountReducer.

Then, we initialze an instance of the MapDriver specifying our instance of Mapper, an instance of the ReduceDriver specifying our instance of Reducer, and an instance of the MapReduceDriver specifying our instances of Mapper and Reducer.

The generic parameter types of the MapDriver must match the generic parameter types of our Mapper, which is Object,Text,Text,IntWritable in our case.

Also, the generic parameter types of the ReduceDriver must match the generic parameter types of our Reducer, which is Text,IntWritable,Text,IntWritable in our case.

Last but not the least, there are 6 generic parameter types for the MapReduceDriver and they must match the 4 generic parameter types of our Mapper and the last 2 generic parameter types of our Reducer. The end result is Object,Text,Text,IntWritable,Text,IntWritable in our case.

In the Listing-4 above, we demostrate unit testing using two methods - first by specifying the input and the expected output result and second by specifying input and testing output using assertions.

In the method testEnhancedWordCountMapper(), we unit test our Mapper. This approach specifies the input as well as the expected result. The input is provided through the method withInput() and the expected output through the method withOutput().

In the method assertEnhancedWordCountMapper(), again we unit test our Mapper. In this approach, we specify the input and assert the expected results. Note that the result comes in a org.apache.hadoop.mrunit.types.Pair, which encapsulates both the key and the value. To get the key, invoke the method getFirst() and to get the value invoke the method getSecond().

Now for the Reducer.

In the method testEnhancedWordCountReducer(), we unit test our Reducer. Recollect from Part-3, that the input to the reduce() method is a value list. This approach specifies the input as well as the expected result.

In the method assertEnhancedWordCountReducer(), again we unit test our Reducer. In this approach, we specify the input and assert the expected results.

And, finally for the Mapper and Reducer together.

In the method testEnhancedWordCountMapReduce(), we unit test our Mapper and Reducer together. This approach specifies the input as well as the expected result.

In the method assertEnhancedWordCountMapReduce(), again we unit test our Mapper and Reducer together. In this approach, we specify the input and assert the expected results.

Executing the above program from Listing-4 in Eclipse results in a successful execution as shown in the following Figure-1 below:

MRUnit Test
Figure-1

From Figure-1 above, it is clear that we have sucessfully written and executed unit tests for our first Hadoop MapReduce program.

What we have not yet done is an end-to-end integration test with a smaller input to actually prove our Hadoop MapReduce really works.

Interestingly, there is a way to perform the end-to-end integration test locally without spinning up the 3-node cluster.

Create a file called hadoop-local.xml in the directory $HADOOP_PREFIX/conf whose contents look as shown below:.

Listing-5
<?xml version="1.0" ?>
<configuration>

    <property>
        <name>fs.default.name</name>
        <value>file:///</value>
    </property>

    <property>
        <name>mapred.job.tracker</name>
        <value>local</value>
    </property>

</configuration>

Package the classes of our EnhancedWordCount program into a jar file called hadoop-polarsparc-1.0.jar in the directory /tmp.

To perform the end-to-end integration test locally, create a small text file in the directory /tmp/hadoop/input and issue the following command:

$HADOOP_PREFIX/bin/hadoop jar /tmp/hadoop-polarsparc-1.0.jar com.polarsparc.hadoop.hmr.EnhancedWordCount -conf $HADOOP_PREFIX/conf/hadoop-local.xml /tmp/hadoop/input /tmp/hadoop/output

The following will be the output:

Output-1

13/12/31 16:50:06 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/12/31 16:50:07 INFO input.FileInputFormat: Total input paths to process : 1
13/12/31 16:50:07 WARN snappy.LoadSnappy: Snappy native library not loaded
13/12/31 16:50:07 INFO mapred.JobClient: Running job: job_local1285465663_0001
13/12/31 16:50:07 INFO mapred.LocalJobRunner: Waiting for map tasks
13/12/31 16:50:07 INFO mapred.LocalJobRunner: Starting task: attempt_local1285465663_0001_m_000000_0
13/12/31 16:50:07 INFO util.ProcessTree: setsid exited with exit code 0
13/12/31 16:50:07 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4b2d5c2b
13/12/31 16:50:07 INFO mapred.MapTask: Processing split: file:/tmp/hadoop/input/short-story.txt:0+604
13/12/31 16:50:07 INFO mapred.MapTask: io.sort.mb = 100
13/12/31 16:50:07 INFO mapred.MapTask: data buffer = 79691776/99614720
13/12/31 16:50:07 INFO mapred.MapTask: record buffer = 262144/327680
13/12/31 16:50:07 INFO mapred.MapTask: Starting flush of map output
13/12/31 16:50:07 INFO mapred.MapTask: Finished spill 0
13/12/31 16:50:07 INFO mapred.Task: Task:attempt_local1285465663_0001_m_000000_0 is done. And is in the process of commiting
13/12/31 16:50:07 INFO mapred.LocalJobRunner: 
13/12/31 16:50:07 INFO mapred.Task: Task 'attempt_local1285465663_0001_m_000000_0' done.
13/12/31 16:50:07 INFO mapred.LocalJobRunner: Finishing task: attempt_local1285465663_0001_m_000000_0
13/12/31 16:50:07 INFO mapred.LocalJobRunner: Map task executor complete.
13/12/31 16:50:07 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2921ef9
13/12/31 16:50:07 INFO mapred.LocalJobRunner: 
13/12/31 16:50:07 INFO mapred.Merger: Merging 1 sorted segments
13/12/31 16:50:07 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 476 bytes
13/12/31 16:50:07 INFO mapred.LocalJobRunner: 
13/12/31 16:50:07 INFO mapred.Task: Task:attempt_local1285465663_0001_r_000000_0 is done. And is in the process of commiting
13/12/31 16:50:07 INFO mapred.LocalJobRunner: 
13/12/31 16:50:07 INFO mapred.Task: Task attempt_local1285465663_0001_r_000000_0 is allowed to commit now
13/12/31 16:50:07 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1285465663_0001_r_000000_0' to /tmp/hadoop/output
13/12/31 16:50:07 INFO mapred.LocalJobRunner: reduce > reduce
13/12/31 16:50:07 INFO mapred.Task: Task 'attempt_local1285465663_0001_r_000000_0' done.
13/12/31 16:50:08 INFO mapred.JobClient:  map 100% reduce 100%
13/12/31 16:50:08 INFO mapred.JobClient: Job complete: job_local1285465663_0001
13/12/31 16:50:08 INFO mapred.JobClient: Counters: 20
13/12/31 16:50:08 INFO mapred.JobClient:   File Output Format Counters 
13/12/31 16:50:08 INFO mapred.JobClient:     Bytes Written=346
13/12/31 16:50:08 INFO mapred.JobClient:   File Input Format Counters 
13/12/31 16:50:08 INFO mapred.JobClient:     Bytes Read=604
13/12/31 16:50:08 INFO mapred.JobClient:   FileSystemCounters
13/12/31 16:50:08 INFO mapred.JobClient:     FILE_BYTES_READ=32336
13/12/31 16:50:08 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=135280
13/12/31 16:50:08 INFO mapred.JobClient:   Map-Reduce Framework
13/12/31 16:50:08 INFO mapred.JobClient:     Reduce input groups=35
13/12/31 16:50:08 INFO mapred.JobClient:     Map output materialized bytes=480
13/12/31 16:50:08 INFO mapred.JobClient:     Combine output records=35
13/12/31 16:50:08 INFO mapred.JobClient:     Map input records=8
13/12/31 16:50:08 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/12/31 16:50:08 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/12/31 16:50:08 INFO mapred.JobClient:     Reduce output records=35
13/12/31 16:50:08 INFO mapred.JobClient:     Spilled Records=70
13/12/31 16:50:08 INFO mapred.JobClient:     Map output bytes=437
13/12/31 16:50:08 INFO mapred.JobClient:     Total committed heap usage (bytes)=504365056
13/12/31 16:50:08 INFO mapred.JobClient:     CPU time spent (ms)=0
13/12/31 16:50:08 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/12/31 16:50:08 INFO mapred.JobClient:     SPLIT_RAW_BYTES=103
13/12/31 16:50:08 INFO mapred.JobClient:     Map output records=38
13/12/31 16:50:08 INFO mapred.JobClient:     Combine input records=38
13/12/31 16:50:08 INFO mapred.JobClient:     Reduce input records=35

Walla !!! did it really work ???

Let us check the output directory /tmp/hadoop/output by issuing the following command:

ls -l /tmp/hadoop/output

The following will be the output:

Output-2

total 4
-rwxrwxrwx 1 bswamina bswamina 334 Dec 31 16:50 part-r-00000
-rwxrwxrwx 1 bswamina bswamina   0 Dec 31 16:50 _SUCCESS

Excellent !!! our Hadoop MapReduce program did complete successfully.

Let us tail the contents of the file /tmp/hadoop/output/part-r-00000 by issuing the following command:

tail /tmp/hadoop/output/part-r-00000

The following will be the output:

Output-3

their	1
there	1
three	1
times	1
truly	1
truth	1
village	1
villagers	1
watched	1
whole	1

References

Hadoop Quick Notes :: Part - 1

Hadoop Quick Notes :: Part - 2

Hadoop Quick Notes :: Part - 3