Hadoop Quick Notes :: Part - 5


Bhaskar S 01/01/2014


Introduction

In Part-4, we implemented and tested our first Hadoop MapReduce program called the EnhancedWordCount.

Also, we demonstrated how to use the MRUnit framework to unit test our Hadoop MapReduce program.

Now we will continue our journey to implement a more advanced Hadoop MapReduce program with custom Writable classes.

Hands-on with Hadoop MapReduce 1.x - Part 3

Let us implement a Hadoop MapReduce program to analyze stocks between 2009 and 2010 to see when they hit a low and a high.

We need some historical stock data for this, which we can get from the website Historical Data for S&P 500 Stocks

Download the historical full set and save it in a file called historical.txt.

Each line in this file is a record that has the format: Ticker, Open, High, Low, Close, Volume, delimited by commas.

In the Mapper, we want each line to be parsed and encapsulated into a Stock Ticker.

The following is the Java class that encapsulates a Stock Ticker called com.polarsparc.hadoop.hmr.StockMapOutputWritable that implements the Writable interface:

Listing-1
package com.polarsparc.hadoop.hmr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class StockMapOutputWritable implements Writable {
    private String symbol;
    private String date;
    private double close;
    private long volume;
    
    public String getSymbol() {
        return symbol;
    }
    
    public void setSymbol(String symbol) {
        this.symbol = symbol;
    }
    
    public String getDate() {
        return date;
    }
    
    public void setDate(String date) {
        this.date = date;
    }
    
    public double getClose() {
        return close;
    }
    
    public void setClose(double close) {
        this.close = close;
    }
    
    public long getVolume() {
        return volume;
    }
    
    public void setVolume(long volume) {
        this.volume = volume;
    }
    
    @Override
    public void readFields(DataInput input)
        throws IOException {
        symbol = input.readUTF();
        date = input.readUTF();
        close = input.readDouble();
        volume = input.readLong();
    }

    @Override
    public void write(DataOutput output)
        throws IOException {
        output.writeUTF(symbol);
        output.writeUTF(date);
        output.writeDouble(close);
        output.writeLong(volume);
    }
    
    @Override
    public String toString() {
        return symbol + "," + date + "," + close + "," + volume;
    }
}

As can be inferred from the Listing-1 above, the Stock Ticker has fields symbol, date of the close price, the close price and the volume.

Next it is time to implement our custom org.apache.hadoop.mapreduce.Mapper.

The following is the Java class called com.polarsparc.hadoop.hmr.StockAnalysisMapper that implements our Mapper:

Listing-2
package com.polarsparc.hadoop.hmr;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class StockAnalysisMapper extends Mapper<Object, Text, Text, StockMapOutputWritable> {
    private final static int TOKEN_COUNT = 7;
    private final static String TOKENIZER = ",";
    
    private Text outkey = new Text();
    
    private final static Log LOG = LogFactory.getLog(StockAnalysisMapper.class.getName());
    
    @Override
    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
        LOG.info("Input value: " + value);
        
        // value string format: YYYYMMDD,Ticker,Open,High,Low,Close,Volume
        String[] tokens = value.toString().split(TOKENIZER);
        if (tokens.length == TOKEN_COUNT) {
            String symbol = tokens[1].toUpperCase();
            String date = tokens[0];
            double close = Double.parseDouble(tokens[5]);
            long volume = Long.parseLong(tokens[6]);
            
            StockMapOutputWritable outval = new StockMapOutputWritable();
            outval.setSymbol(symbol);
            outval.setDate(date);
            outval.setClose(close);
            outval.setVolume(volume);
            
            LOG.info(outval);
            
            outkey.set(symbol);
            
            context.write(outkey, outval);
        }
    }
}

As can be inferred from the Listing-2 above, we tokenize the input text using comma, check for the token count to be 7, we create and populate an instance of StockMapOutputWritable, and finally write the symbol as the output key from the Mapper and our custom stock object as the output value.

In the Reducer, we want to iterate over the list of StockMapOutputWritable values and determine the low close and high close information for the stock symbol.

The following is the Java class that encapsulates this Stock Analysis information called com.polarsparc.hadoop.hmr.StockReduceOutputWritable that implements the Writable interface:

Listing-3
package com.polarsparc.hadoop.hmr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class StockReduceOutputWritable implements Writable {
    private String symbol;
    private String lowDate;
    private double lowClose;
    private long lowVolume;
    private String highDate;
    private double highClose;
    private long highVolume;
    
    public String getSymbol() {
        return symbol;
    }
    
    public String getLowDate() {
        return lowDate;
    }
    
    public double getLowClose() {
        return lowClose;
    }
    
    public long getLowVolume() {
        return lowVolume;
    }
    
    public String getHighDate() {
        return highDate;
    }
    
    public double getHighClose() {
        return highClose;
    }
    
    public long getHighVolume() {
        return highVolume;
    }
    
    public void setSymbol(String symbol) {
        this.symbol = symbol;
    }
    
    public void setLowDate(String lowDate) {
        this.lowDate = lowDate;
    }
    
    public void setLowClose(double lowClose) {
        this.lowClose = lowClose;
    }
    
    public void setLowVolume(long lowVolume) {
        this.lowVolume = lowVolume;
    }
    
    public void setHighDate(String highDate) {
        this.highDate = highDate;
    }
    
    public void setHighClose(double highClose) {
        this.highClose = highClose;
    }
    
    public void setHighVolume(long highVolume) {
        this.highVolume = highVolume;
    }
    
    @Override
    public void readFields(DataInput input)
        throws IOException {
        symbol = input.readUTF();
        lowDate = input.readUTF();
        lowClose = input.readDouble();
        lowVolume = input.readLong();
        highDate = input.readUTF();
        highClose = input.readDouble();
        highVolume = input.readLong();
    }

    @Override
    public void write(DataOutput output)
        throws IOException {
        output.writeUTF(symbol);
        output.writeUTF(lowDate);
        output.writeDouble(lowClose);
        output.writeLong(lowVolume);
        output.writeUTF(highDate);
        output.writeDouble(highClose);
        output.writeLong(highVolume);
    }
    
    @Override
    public String toString() {
        return symbol + "," + lowDate + "," + lowClose + "," + lowVolume + "," +
            highDate + "," + highClose + "," + highVolume;
    }
}

As can be inferred from the Listing-3 above, the custom class encapsulates information of the symbol, the date, the close price and the volume for the low price point and similarly the date, the close price and the volume for the high price point.

Next it is time to implement our custom org.apache.hadoop.mapreduce.Reducer.

The following is the Java class called com.polarsparc.hadoop.hmr.StockAnalysisReducer that implements our Reducer:

Listing-4
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class StockAnalysisReducer extends Reducer<Text,StockMapOutputWritable,Text,StockReduceOutputWritable> {
    private double low = 99999.99;
    private double high = 0.0;
    
    private StockReduceOutputWritable result = new StockReduceOutputWritable();
    
    private final static Log LOG = LogFactory.getLog(StockAnalysisReducer.class.getName());

    @Override
    public void reduce(Text key, Iterable<StockMapOutputWritable> values, Context context)
        throws IOException, InterruptedException {
        result.setSymbol(key.toString());
        for (StockMapOutputWritable val : values) {
            LOG.info("For val date: " + val.getDate() + ", Input: " + val);
            
            LOG.debug("Val close: " + val.getClose() + ", Low: " + low);
            
            if (val.getClose() < low) {
                low = val.getClose();
                result.setLowClose(val.getClose());
                result.setLowDate(val.getDate());
                result.setLowVolume(val.getVolume());
            }
            
            LOG.debug("Val close: " + val.getClose() + ", High: " + high);
            
            if (val.getClose() > high) {
                high = val.getClose();
                result.setHighClose(val.getClose());
                result.setHighDate(val.getDate());
                result.setHighVolume(val.getVolume());
            }
            
            LOG.info("For val date: " + val.getDate() + ", Output: " + result);
        }
        context.write(key, result);
    }
}

As can be inferred from the Listing-4 above, we create an instance of StockReduceOutputWritable. As we iterate over the list of StockMapOutputWritable values to determine the low and high price points, we populate the information in the instance of StockReduceOutputWritable. Finally, we write the symbol as the output key from the Reducer and our custom stock object as the output value.

The final step is to implement the main Hadoop MapReduce job.

The following is the main Java class called com.polarsparc.hadoop.hmr.StockAnalysis:

Listing-5
package com.polarsparc.hadoop.hmr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class StockAnalysis {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: com.polarsparc.hadoop.hmr.StockAnalysis <in> <out>");
          System.exit(2);
        }
        
        Job job = new Job(conf, "StockAnalysis");
        job.setJarByClass(StockAnalysis.class);
        job.setMapperClass(StockAnalysisMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(StockMapOutputWritable.class);
        job.setReducerClass(StockAnalysisReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(StockReduceOutputWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

As can be inferred from the Listing-5 above, in the main() function, we create a Job object, specify the Mapper class (StockAnalysisMapper), specify the output key class (Text) and value class (StockMapOutputWritable) from the Mapper, specify the Reducer class (StockAnalysisReducer), and specify the output key class (Text) and value class (StockReduceOutputWritable) from the Reducer. Next, we specify the input and output paths. And finally, we invoke the waitForCompletion() method on the Job object to submit the Hadoop MapReduce job and wait for it to finish.

We will leverage MRUnit to test our StockAnalysis program.

The following is the Java class called com.polarsparc.hadoop.hmr.test.StockMapReduceUnitTests that demostrates the use of MRUnit unit testing framework:

Listing-6
package com.polarsparc.hadoop.hmr.test;

import java.io.IOException;
import java.util.List;
import java.util.ArrayList;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.junit.Before;
import org.junit.Test;
import org.junit.Assert;

import com.polarsparc.hadoop.hmr.StockMapOutputWritable;
import com.polarsparc.hadoop.hmr.StockAnalysisMapper;
import com.polarsparc.hadoop.hmr.StockReduceOutputWritable;
import com.polarsparc.hadoop.hmr.StockAnalysisReducer;

public class StockMapReduceUnitTests {
    // The generic types have to match the ones specified in the StockMapper
    private MapDriver<Object, Text, Text, StockMapOutputWritable> mapDriver;
    
    // The generic types have to match the ones specified in the StockReducer
    private ReduceDriver<Text,StockMapOutputWritable,Text,StockReduceOutputWritable> reduceDriver;
    
    // Must specify six generic types - input and output of the StockMapper and
    // the output of the StockReducer
    private MapReduceDriver<Object, Text, Text, StockMapOutputWritable, Text,StockReduceOutputWritable> mapReduceDriver;
    
    @Before
    public void setup() {
        StockAnalysisMapper mapper = new StockAnalysisMapper();
        StockAnalysisReducer reducer = new StockAnalysisReducer();
        
        mapDriver = MapDriver.newMapDriver(mapper);
        reduceDriver = ReduceDriver.newReduceDriver(reducer);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
    }
    
    @Test
    public void assertStockMapper()
        throws IOException {
        // Input three lines as input
        mapDriver.withInput(new Text(), new Text("20090904,A,25.37,25.92,25.1475,25.86,32556"));
        mapDriver.withInput(new Text(), new Text("20090909,A,26.31,27.19,26.16,27.15,36764"));
        mapDriver.withInput(new Text(), new Text("20090911,A,27.88,28.16,27.75,28.05,43907"));
        
        List<Pair<Text, StockMapOutputWritable>> output = mapDriver.run();
        
        // We should see three outputs from the map function
        Assert.assertEquals(3, output.size());
        
        // The last output from map should be the StockWritable4Map with:
        // Date: 20090911, Close: 28.05, Volume: 43907
        Assert.assertEquals("20090911", output.get(2).getSecond().getDate());
        Assert.assertEquals(new DoubleWritable(28.05), new DoubleWritable(output.get(2).getSecond().getClose()));
        Assert.assertEquals(43907, output.get(2).getSecond().getVolume());
    }
    
    @Test
    public void assertStockReducer()
        throws IOException {
        StockMapOutputWritable v1 = new StockMapOutputWritable();
        v1.setSymbol("A");
        v1.setDate("20090904");
        v1.setClose(25.86);
        v1.setVolume(32556);
        
        StockMapOutputWritable v2 = new StockMapOutputWritable();
        v2.setSymbol("A");
        v2.setDate("20090909");
        v2.setClose(27.15);
        v2.setVolume(36764);
        
        StockMapOutputWritable v3 = new StockMapOutputWritable();
        v3.setSymbol("A");
        v3.setDate("20090911");
        v3.setClose(28.05);
        v3.setVolume(43907);
        
        List<StockMapOutputWritable> aList = new ArrayList<StockMapOutputWritable>();
        aList.add(v1);
        aList.add(v2);
        aList.add(v3);
        
        // Input the symbol and the values list
        reduceDriver.withInput(new Text("A"), aList);
        
        List<Pair<Text, StockReduceOutputWritable>> output = reduceDriver.run();
        
        // We should see one output from the reduce function
        Assert.assertEquals(1, output.size());
        
        // The output from reduce should be the StockWritable4Reduce with:
        // lowDate: 20090904, lowClose: 25.86, lowVolume: 32556
        // highDate: 20090911, highClose: 28.05, highVolume: 43907
        Assert.assertEquals("20090904", output.get(0).getSecond().getLowDate());
        Assert.assertEquals(new DoubleWritable(25.86), new DoubleWritable(output.get(0).getSecond().getLowClose()));
        Assert.assertEquals(32556, output.get(0).getSecond().getLowVolume());
        Assert.assertEquals("20090911", output.get(0).getSecond().getHighDate());
        Assert.assertEquals(new DoubleWritable(28.05), new DoubleWritable(output.get(0).getSecond().getHighClose()));
        Assert.assertEquals(43907, output.get(0).getSecond().getHighVolume());
    }
    
    @Test
    public void assertStockMapReduce()
        throws IOException {
        // Input three lines to the map function
        mapReduceDriver.withInput(new Text(), new Text("20090904,A,25.37,25.92,25.1475,25.86,32556"));
        mapReduceDriver.withInput(new Text(), new Text("20090909,A,26.31,27.19,26.16,27.15,36764"));
        mapReduceDriver.withInput(new Text(), new Text("20090911,A,27.88,28.16,27.75,28.05,43907"));
        
        List<Pair<Text, StockReduceOutputWritable>> output = mapReduceDriver.run();
        
        // We should see one output from the reduce function
        Assert.assertEquals(1, output.size());
        
        // The output from reduce should be the StockWritable4Reduce with:
        // lowDate: 20090904, lowClose: 25.86, lowVolume: 32556
        // highDate: 20090911, highClose: 28.05, highVolume: 43907
        Assert.assertEquals("20090904", output.get(0).getSecond().getLowDate());
        Assert.assertEquals(new DoubleWritable(25.86), new DoubleWritable(output.get(0).getSecond().getLowClose()));
        Assert.assertEquals(32556, output.get(0).getSecond().getLowVolume());
        Assert.assertEquals("20090911", output.get(0).getSecond().getHighDate());
        Assert.assertEquals(new DoubleWritable(28.05), new DoubleWritable(output.get(0).getSecond().getHighClose()));
        Assert.assertEquals(43907, output.get(0).getSecond().getHighVolume());
    }
}

In the Listing-6 above, we demostrate unit testing using assertions.

Executing the above program from Listing-6 in Eclipse results in a successful execution as shown in the following Figure-1 below:

MRUnit Test
Figure-1

From Figure-1 above, it is clear that we have sucessfully written and executed unit tests for our Stock Analysis Hadoop MapReduce program.

Next, we will perform an end-to-end integration test locally.

Package the classes of our StockAnalysis program into a jar file called hadoop-polarsparc-1.0.jar in the directory /tmp.

To perform the end-to-end integration test locally, create a smaller historical.txt file (we copied upto the symbol AAPL - about 731 records) in the directory /tmp/hadoop/input and issue the following command:

$HADOOP_PREFIX/bin/hadoop jar /tmp/hadoop-polarsparc-1.0.jar com.polarsparc.hadoop.hmr.StockAnalysis -conf $HADOOP_PREFIX/conf/hadoop-local.xml /tmp/hadoop/input /tmp/hadoop/output

The following will be the output:

Output-1

14/01/01 21:30:26 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/01/01 21:30:26 INFO input.FileInputFormat: Total input paths to process : 1
14/01/01 21:30:26 WARN snappy.LoadSnappy: Snappy native library not loaded
14/01/01 21:30:27 INFO mapred.JobClient: Running job: job_local1821376773_0001
14/01/01 21:30:27 INFO mapred.LocalJobRunner: Waiting for map tasks
14/01/01 21:30:27 INFO mapred.LocalJobRunner: Starting task: attempt_local1821376773_0001_m_000000_0
14/01/01 21:30:27 INFO util.ProcessTree: setsid exited with exit code 0
14/01/01 21:30:27 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@146816b7
14/01/01 21:30:27 INFO mapred.MapTask: Processing split: file:/tmp/hadoop/input/historical.txt:0+31887
14/01/01 21:30:27 INFO mapred.MapTask: io.sort.mb = 100
14/01/01 21:30:27 INFO mapred.MapTask: data buffer = 79691776/99614720
14/01/01 21:30:27 INFO mapred.MapTask: record buffer = 262144/327680
14/01/01 21:30:27 INFO mapred.MapTask: Starting flush of map output
14/01/01 21:30:27 INFO mapred.MapTask: Finished spill 0
14/01/01 21:30:27 INFO mapred.Task: Task:attempt_local1821376773_0001_m_000000_0 is done. And is in the process of commiting
14/01/01 21:30:27 INFO mapred.LocalJobRunner: 
14/01/01 21:30:27 INFO mapred.Task: Task 'attempt_local1821376773_0001_m_000000_0' done.
14/01/01 21:30:27 INFO mapred.LocalJobRunner: Finishing task: attempt_local1821376773_0001_m_000000_0
14/01/01 21:30:27 INFO mapred.LocalJobRunner: Map task executor complete.
14/01/01 21:30:27 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@6eb1b8de
14/01/01 21:30:27 INFO mapred.LocalJobRunner: 
14/01/01 21:30:27 INFO mapred.Merger: Merging 1 sorted segments
14/01/01 21:30:27 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 26061 bytes
14/01/01 21:30:27 INFO mapred.LocalJobRunner: 
14/01/01 21:30:27 INFO mapred.Task: Task:attempt_local1821376773_0001_r_000000_0 is done. And is in the process of commiting
14/01/01 21:30:27 INFO mapred.LocalJobRunner: 
14/01/01 21:30:27 INFO mapred.Task: Task attempt_local1821376773_0001_r_000000_0 is allowed to commit now
14/01/01 21:30:27 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1821376773_0001_r_000000_0' to /tmp/hadoop/output
14/01/01 21:30:27 INFO mapred.LocalJobRunner: reduce > reduce
14/01/01 21:30:27 INFO mapred.Task: Task 'attempt_local1821376773_0001_r_000000_0' done.
14/01/01 21:30:28 INFO mapred.JobClient:  map 100% reduce 100%
14/01/01 21:30:28 INFO mapred.JobClient: Job complete: job_local1821376773_0001
14/01/01 21:30:28 INFO mapred.JobClient: Counters: 20
14/01/01 21:30:28 INFO mapred.JobClient:   File Output Format Counters 
14/01/01 21:30:28 INFO mapred.JobClient:     Bytes Written=161
14/01/01 21:30:28 INFO mapred.JobClient:   File Input Format Counters 
14/01/01 21:30:28 INFO mapred.JobClient:     Bytes Read=31887
14/01/01 21:30:28 INFO mapred.JobClient:   FileSystemCounters
14/01/01 21:30:28 INFO mapred.JobClient:     FILE_BYTES_READ=122125
14/01/01 21:30:28 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=188467
14/01/01 21:30:28 INFO mapred.JobClient:   Map-Reduce Framework
14/01/01 21:30:28 INFO mapred.JobClient:     Reduce input groups=3
14/01/01 21:30:28 INFO mapred.JobClient:     Map output materialized bytes=26065
14/01/01 21:30:28 INFO mapred.JobClient:     Combine output records=0
14/01/01 21:30:28 INFO mapred.JobClient:     Map input records=731
14/01/01 21:30:28 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/01/01 21:30:28 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
14/01/01 21:30:28 INFO mapred.JobClient:     Reduce output records=3
14/01/01 21:30:28 INFO mapred.JobClient:     Spilled Records=1462
14/01/01 21:30:28 INFO mapred.JobClient:     Map output bytes=24597
14/01/01 21:30:28 INFO mapred.JobClient:     Total committed heap usage (bytes)=504365056
14/01/01 21:30:28 INFO mapred.JobClient:     CPU time spent (ms)=0
14/01/01 21:30:28 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/01/01 21:30:28 INFO mapred.JobClient:     SPLIT_RAW_BYTES=102
14/01/01 21:30:28 INFO mapred.JobClient:     Map output records=731
14/01/01 21:30:28 INFO mapred.JobClient:     Combine input records=0
14/01/01 21:30:28 INFO mapred.JobClient:     Reduce input records=731

Let us check the output directory /tmp/hadoop/output by issuing the following command:

ls -l /tmp/hadoop/output

The following will be the output:

Output-2

total 4
-rwxrwxrwx 1 bswamina bswamina 149 Jan  1 21:30 part-r-00000
-rwxrwxrwx 1 bswamina bswamina   0 Jan  1 21:30 _SUCCESS

Let us tail the contents of the file /tmp/hadoop/output/part-r-00000 by issuing the following command:

tail /tmp/hadoop/output/part-r-00000

The following will be the output:

Output-3

A	A,20090902,25.22,64614,20100429,37.23,23156
AA	AA,20100702,10.0,154945,20100429,37.23,23156
AAPL	AAPL,20100702,10.0,154945,20100618,274.074,280221

We have successfully implemented and tested our Stock Analysis Hadoop MapReduce program locally.

Time to deploy and test in our 3-node Hadoop cluster.

First, copy the file historical.txt into HDFS under /data/stocks.

Now let us execute our Stock Analysis Hadoop MapReduce by issuing the following command:

$HADOOP_PREFIX/bin/hadoop jar /tmp/hadoop-polarsparc-1.0.jar com.polarsparc.hadoop.hmr.StockAnalysis /data/stocks /data/output

The following will be the output:

Output-4

14/01/01 21:51:11 INFO input.FileInputFormat: Total input paths to process : 1
14/01/01 21:51:11 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/01/01 21:51:11 WARN snappy.LoadSnappy: Snappy native library not loaded
14/01/01 21:51:11 INFO mapred.JobClient: Running job: job_201401012020_0001
14/01/01 21:51:12 INFO mapred.JobClient:  map 0% reduce 0%
14/01/01 21:51:21 INFO mapred.JobClient:  map 100% reduce 0%
14/01/01 21:51:29 INFO mapred.JobClient:  map 100% reduce 33%
14/01/01 21:51:31 INFO mapred.JobClient:  map 100% reduce 100%
14/01/01 21:51:33 INFO mapred.JobClient: Job complete: job_201401012020_0001
14/01/01 21:51:33 INFO mapred.JobClient: Counters: 29
14/01/01 21:51:33 INFO mapred.JobClient:   Job Counters 
14/01/01 21:51:33 INFO mapred.JobClient:     Launched reduce tasks=1
14/01/01 21:51:33 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=9408
14/01/01 21:51:33 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/01/01 21:51:33 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/01/01 21:51:33 INFO mapred.JobClient:     Launched map tasks=1
14/01/01 21:51:33 INFO mapred.JobClient:     Data-local map tasks=1
14/01/01 21:51:33 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=10273
14/01/01 21:51:33 INFO mapred.JobClient:   File Output Format Counters 
14/01/01 21:51:33 INFO mapred.JobClient:     Bytes Written=26854
14/01/01 21:51:33 INFO mapred.JobClient:   FileSystemCounters
14/01/01 21:51:33 INFO mapred.JobClient:     FILE_BYTES_READ=4539170
14/01/01 21:51:33 INFO mapred.JobClient:     HDFS_BYTES_READ=5326654
14/01/01 21:51:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=9193733
14/01/01 21:51:33 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=26854
14/01/01 21:51:33 INFO mapred.JobClient:   File Input Format Counters 
14/01/01 21:51:33 INFO mapred.JobClient:     Bytes Read=5326536
14/01/01 21:51:33 INFO mapred.JobClient:   Map-Reduce Framework
14/01/01 21:51:33 INFO mapred.JobClient:     Map output materialized bytes=4539170
14/01/01 21:51:33 INFO mapred.JobClient:     Map input records=122574
14/01/01 21:51:33 INFO mapred.JobClient:     Reduce shuffle bytes=4539170
14/01/01 21:51:33 INFO mapred.JobClient:     Spilled Records=245148
14/01/01 21:51:33 INFO mapred.JobClient:     Map output bytes=4294016
14/01/01 21:51:33 INFO mapred.JobClient:     Total committed heap usage (bytes)=177016832
14/01/01 21:51:33 INFO mapred.JobClient:     CPU time spent (ms)=1810
14/01/01 21:51:33 INFO mapred.JobClient:     Combine input records=0
14/01/01 21:51:33 INFO mapred.JobClient:     SPLIT_RAW_BYTES=118
14/01/01 21:51:33 INFO mapred.JobClient:     Reduce input records=122574
14/01/01 21:51:33 INFO mapred.JobClient:     Reduce input groups=524
14/01/01 21:51:33 INFO mapred.JobClient:     Combine output records=0
14/01/01 21:51:33 INFO mapred.JobClient:     Physical memory (bytes) snapshot=230092800
14/01/01 21:51:33 INFO mapred.JobClient:     Reduce output records=524
14/01/01 21:51:33 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=771518464
14/01/01 21:51:33 INFO mapred.JobClient:     Map output records=122574

Let us list all the file(s) under the /data/output directory of HDFS by issuing the following command:

hadoop fs -ls /data/output

The following will be the output:

Output-5

Found 3 items
-rw-r--r--   2 hadoop supergroup          0 2014-01-01 21:51 /data/output/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2014-01-01 21:51 /data/output/_logs
-rw-r--r--   2 hadoop supergroup      26854 2014-01-01 21:51 /data/output/part-r-00000

From the Output-5, we see a file named _SUCCESS. The presence of this file indicates that the Stock Analysis Hadoop MapReduce program completed successfully.

The stock analysis results will be in the file named part-r-00000.

We are not going to display all the contents of this file, rather peek at the tail of the file. Issue the following command to tail the contents of the file /data/output/part-r-00000 in HDFS:

hadoop fs -tail /data/output/part-r-00000

The following will be the output:

Output-6

WMT	WMT,20090825,1.35,266104,20100104,626.75,19579
WPI	WPI,20090825,1.35,266104,20100104,626.75,19579
WPO	WPO,20090825,1.35,266104,20100104,626.75,19579
WU	WU,20090825,1.35,266104,20100104,626.75,19579
WY	WY,20090825,1.35,266104,20100104,626.75,19579
WYE	WYE,20090825,1.35,266104,20100104,626.75,19579
WYN	WYN,20090825,1.35,266104,20100104,626.75,19579
WYNN	WYNN,20090825,1.35,266104,20100104,626.75,19579
X	X,20090825,1.35,266104,20100104,626.75,19579
XEL	XEL,20090825,1.35,266104,20100104,626.75,19579
XL	XL,20090825,1.35,266104,20100104,626.75,19579
XLNX	XLNX,20090825,1.35,266104,20100104,626.75,19579
XOM	XOM,20090825,1.35,266104,20100104,626.75,19579
XRAY	XRAY,20090825,1.35,266104,20100104,626.75,19579
XRX	XRX,20090825,1.35,266104,20100104,626.75,19579
XTO	XTO,20090825,1.35,266104,20100104,626.75,19579
YHOO	YHOO,20090825,1.35,266104,20100104,626.75,19579
YUM	YUM,20090825,1.35,266104,20100104,626.75,19579
ZION	ZION,20090825,1.35,266104,20100104,626.75,19579
ZMH	ZMH,20090825,1.35,266104,20100104,626.75,19579

References

Hadoop Quick Notes :: Part - 1

Hadoop Quick Notes :: Part - 2

Hadoop Quick Notes :: Part - 3

Hadoop Quick Notes :: Part - 4