Jump to content

Get Started Analyzing Data with Hadoop

+ 1
  tomwhite's Photo
Posted Oct 28 2009 11:14 AM

To take advantage of the parallel processing that Hadoop provides, we need to express our query as a MapReduce job. After some local, small-scale testing, we will be able to run it on a cluster of machines.

Map and Reduce

MapReduce works by breaking the processing into two phases: the map phase and the reduce phase. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions: the map function and the reduce function.

The input to our map phase is the raw NCDC data. We choose a text input format that gives us each line in the dataset as a text value. The key is the offset of the beginning of the line from the beginning of the file, but as we have no need for this, we ignore it.

Our map function is simple. We pull out the year and the air temperature, since these are the only fields we are interested in. In this case, the map function is just a data preparation phase, setting up the data in such a way that the reducer function can do its work on it: finding the maximum temperature for each year. The map function is also a good place to drop bad records: here we filter out temperatures that are missing, suspect, or erroneous.

To visualize the way the map works, consider the following sample lines of input data (some unused columns have been dropped to fit the page, indicated by ellipses):

0067011990999991950051507004...9999999N9+00001+99999999999...

0043011990999991950051512004...9999999N9+00221+99999999999...

0043011990999991950051518004...9999999N9-00111+99999999999...

0043012650999991949032412004...0500001N9+01111+99999999999...

0043012650999991949032418004...0500001N9+00781+99999999999...

These lines are presented to the map function as the key-value pairs:

(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)

(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)

(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)

(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)

(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)

The keys are the line offsets within the file, which we ignore in our map function. The map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output. (The temperature values have been interpreted as integers.)

(1950, 0)

(1950, 22)

(1950, −11)

(1949, 111)

(1949, 78)

The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key. So, continuing the example, our reduce function sees the following input:

(1949, [111, 78])

(1950, [0, 22, −11])

Each year appears with a list of all its air temperature readings. All the reduce function has to do now is iterate through the list and pick up the maximum reading:

(1949, 111)

(1950, 22)

This is the final output: the maximum global temperature recorded in each year.

The whole data flow is illustrated in Figure 2.1 At the bottom of the diagram is a Unix pipeline, which mimics the whole MapReduce flow, and which we will see again later in the chapter when we look at Hadoop Streaming.

Figure 2.1. MapReduce logical data flow

Attached Image

Java MapReduce

Having run through how the MapReduce program works, the next step is to express it in code. We need three things: a map function, a reduce function, and some code to run the job. The map function is represented by an implementation of the Mapper interface, which declares a map() method. Example 2.3 shows the implementation of our map function.

Example 2.3. Mapper for maximum temperature example

import java.io.IOException;



import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;



public class MaxTemperatureMapper extends MapReduceBase

  implements Mapper {



  private static final int MISSING = 9999;

  

  public void map(LongWritable key, Text value,

      OutputCollector output, Reporter reporter)

      throws IOException {

    

    String line = value.toString();

    String year = line.substring(15, 19);

    int airTemperature;

    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs

      airTemperature = Integer.parseInt(line.substring(88, 92));

    } else {

      airTemperature = Integer.parseInt(line.substring(87, 92));

    }

    String quality = line.substring(92, 93);

    if (airTemperature != MISSING && quality.matches("[01459]")) {

      output.collect(new Text(year), new IntWritable(airTemperature));

    }

  }

}


The Mapper interface is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. For the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer). Rather than use built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization. These are found in the org.apache.hadoop.io package. Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer).

The map() method is passed a key and a value. We convert the Text value containing the line of input into a Java String, then use its substring() method to extract the columns we are interested in.

The map() method also provides an instance of OutputCollector to write the output to. In this case, we write the year as a Text object (since we are just using it as a key), and the temperature wrapped in an IntWritable. We write an output record only if the temperature is present and the quality code indicates the temperature reading is OK.

The reduce function is similarly defined using a Reducer, as illustrated in Example 2.4.

Example 2.4. Reducer for maximum temperature example

import java.io.IOException;

import java.util.Iterator;



import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;



public class MaxTemperatureReducer extends MapReduceBase

  implements Reducer {



  public void reduce(Text key, Iterator values,

      OutputCollector output, Reporter reporter)

      throws IOException {

    

    int maxValue = Integer.MIN_VALUE;

    while (values.hasNext()) {

      maxValue = Math.max(maxValue, values.next().get());

    }

    output.collect(key, new IntWritable(maxValue));

  }

}


Again, four formal type parameters are used to specify the input and output types, this time for the reduce function. The input types of the reduce function must match the output type of the map function: Text and IntWritable. And in this case, the output types of the reduce function are Text and IntWritable, for a year and its maximum temperature, which we find by iterating through the temperatures and comparing each with a record of the highest found so far.

The third piece of code runs the MapReduce job (see Example 2.5).

Example 2.5. Application to find the maximum temperature in the weather dataset

import java.io.IOException;



import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;



public class MaxTemperature {



  public static void main(String[] args) throws IOException {

    if (args.length != 2) {

      System.err.println("Usage: MaxTemperature  ");

      System.exit(-1);

    }

    

    JobConf conf = new JobConf(MaxTemperature.class);

    conf.setJobName("Max temperature");



    FileInputFormat.addInputPath(conf, new Path(args[0]));

    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    

    conf.setMapperClass(MaxTemperatureMapper.class);

    conf.setReducerClass(MaxTemperatureReducer.class);



    conf.setOutputKeyClass(Text.class);

    conf.setOutputValueClass(IntWritable.class);



    JobClient.runJob(conf);

  }

}


A JobConf object forms the specification of the job. It gives you control over how the job is run. When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute round the cluster). Rather than explicitly specify the name of the JAR file, we can pass a class in the JobConf constructor, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class.

Having constructed a JobConf object, we specify the input and output paths. An input path is specified by calling the static addInputPath() method on FileInputFormat, and it can be a single file, a directory (in which case, the input forms all the files in that directory), or a file pattern. As the name suggests, addInputPath() can be called more than once to use input from multiple paths.

The output path (of which there is only one) is specified by the static setOutputPath() method on FileOutputFormat. It specifies a directory where the output files from the reducer functions are written. The directory shouldn’t exist before running the job, as Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with another).

Next, we specify the map and reduce types to use via the setMapperClass() and setReducerClass() methods.

The setOutputKeyClass() and setOutputValueClass() methods control the output types for the map and the reduce functions, which are often the same, as they are in our case. If they are different, then the map output types can be set using the methods setMapOutputKeyClass() and setMapOutputValueClass().

The input types are controlled via the input format, which we have not explicitly set since we are using the default TextInputFormat.

After setting the classes that define the map and reduce functions, we are ready to run the job. The static runJob() method on JobClient submits the job and waits for it to finish, writing information about its progress to the console.

A test run

After writing a MapReduce job, it’s normal to try it out on a small dataset to flush out any immediate problems with the code. First install Hadoop in standalone mode. This is the mode in which Hadoop runs using the local filesystem with a local job runner. Let’s test it on the five-line sample discussed earlier (the output has been slightly reformatted to fit the page):

% export HADOOP_CLASSPATH=build/classes

% hadoop MaxTemperature input/ncdc/sample.txt output

09/04/07 12:34:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=Job

Tracker, sessionId=

09/04/07 12:34:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the 

arguments. Applications should implement Tool for the same.

09/04/07 12:34:35 WARN mapred.JobClient: No job jar file set.  User classes may not 

be found. See JobConf(Class) or JobConf#setJar(String).

09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process : 1

09/04/07 12:34:35 INFO mapred.JobClient: Running job: job_local_0001

09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process : 1

09/04/07 12:34:35 INFO mapred.MapTask: numReduceTasks: 1

09/04/07 12:34:35 INFO mapred.MapTask: io.sort.mb = 100

09/04/07 12:34:35 INFO mapred.MapTask: data buffer = 79691776/99614720

09/04/07 12:34:35 INFO mapred.MapTask: record buffer = 262144/327680

09/04/07 12:34:35 INFO mapred.MapTask: Starting flush of map output

09/04/07 12:34:36 INFO mapred.MapTask: Finished spill 0

09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is 

done. And is in the process of commiting

09/04/07 12:34:36 INFO mapred.LocalJobRunner: file:/Users/tom/workspace/htdg/input/n

cdc/sample.txt:0+529

09/04/07 12:34:36 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.

09/04/07 12:34:36 INFO mapred.LocalJobRunner: 

09/04/07 12:34:36 INFO mapred.Merger: Merging 1 sorted segments

09/04/07 12:34:36 INFO mapred.Merger: Down to the last merge-pass, with 1 segments 

left of total size: 57 bytes

09/04/07 12:34:36 INFO mapred.LocalJobRunner: 

09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done

. And is in the process of commiting

09/04/07 12:34:36 INFO mapred.LocalJobRunner: 

09/04/07 12:34:36 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is 

allowed to commit now

09/04/07 12:34:36 INFO mapred.FileOutputCommitter: Saved output of task 

'attempt_local_0001_r_000000_0' to file:/Users/tom/workspace/htdg/output

09/04/07 12:34:36 INFO mapred.LocalJobRunner: reduce > reduce

09/04/07 12:34:36 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.

09/04/07 12:34:36 INFO mapred.JobClient:  map 100% reduce 100%

09/04/07 12:34:36 INFO mapred.JobClient: Job complete: job_local_0001

09/04/07 12:34:36 INFO mapred.JobClient: Counters: 13

09/04/07 12:34:36 INFO mapred.JobClient:   FileSystemCounters

09/04/07 12:34:36 INFO mapred.JobClient:     FILE_BYTES_READ=27571

09/04/07 12:34:36 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=53907

09/04/07 12:34:36 INFO mapred.JobClient:   Map-Reduce Framework

09/04/07 12:34:36 INFO mapred.JobClient:     Reduce input groups=2

09/04/07 12:34:36 INFO mapred.JobClient:     Combine output records=0

09/04/07 12:34:36 INFO mapred.JobClient:     Map input records=5

09/04/07 12:34:36 INFO mapred.JobClient:     Reduce shuffle bytes=0

09/04/07 12:34:36 INFO mapred.JobClient:     Reduce output records=2

09/04/07 12:34:36 INFO mapred.JobClient:     Spilled Records=10

09/04/07 12:34:36 INFO mapred.JobClient:     Map output bytes=45

09/04/07 12:34:36 INFO mapred.JobClient:     Map input bytes=529

09/04/07 12:34:36 INFO mapred.JobClient:     Combine input records=0

09/04/07 12:34:36 INFO mapred.JobClient:     Map output records=5

09/04/07 12:34:36 INFO mapred.JobClient:     Reduce input records=5

When the hadoop command is invoked with a classname as the first argument, it launches a JVM to run the class. It is more convenient to use hadoop than straight java since the former adds the Hadoop libraries (and their dependencies) to the classpath, and picks up the Hadoop configuration too. To add the application classes to the classpath, we’ve defined an environment variable called HADOOP_CLASSPATH, which the hadoop script picks up.

Note

When running in local (standalone) mode, the programs in this book all assume that you have set the HADOOP_CLASSPATH in this way. The commands should be run from the directory that the example code is installed in.

The output from running the job provides some useful information. (The warning about the job JAR file not being found is expected, since we are running in local mode without a JAR. We won’t see this warning when we run on a cluster.) For example, we can see that the job was given an ID of job_local_0001, and it ran one map task and one reduce task (with the IDs attempt_local_0001_m_000000_0 and attempt_local_0001_r_000000_0). Knowing the job and task IDs can be very useful when debugging MapReduce jobs.

The last section of the output, entitled “Counters,” shows the statistics that Hadoop generates for each job it runs. These are very useful for checking whether the amount of data processed is what you expected. For example, we can follow the number of records that went through the system: five map inputs produced five map outputs, then five reduce inputs in two groups produced two reduce outputs.

The output was written to the output directory, which contains one output file per reducer. The job had a single reducer, so we find a single file, named part-00000:

% cat output/part-00000

1949	111

1950	22

This result is the same as when we went through it by hand earlier. We interpret this as saying that the maximum temperature recorded in 1949 was 11.1°C, and in 1950 it was 2.2°C.

The new Java MapReduce API

Release 0.20.0 of Hadoop included a new Java MapReduce API, sometimes referred to as “Context Objects,” designed to make the API easier to evolve in the future. The new API is type-incompatible with the old, however, so applications need to be rewritten to take advantage of it.[18]

There are several notable differences between the two APIs:

  • The new API favors abstract classes over interfaces, since these are easier to evolve. For example, you can add a method (with a default implementation) to an abstract class without breaking old implementations of the class. In the new API, the Mapper and Reducer interfaces are now abstract classes.

  • The new API is in the org.apache.hadoop.mapreduce package (and subpackages). The old API is found in org.apache.hadoop.mapred.

  • The new API makes extensive use of context objects that allow the user code to communicate with the MapReduce system. The MapContext, for example, essentially unifies the role of the JobConf, the OutputCollector, and the Reporter.

  • The new API supports both a “push” and a “pull” style of iteration. In both APIs, key-value record pairs are pushed to the mapper, but in addition, the new API allows a mapper to pull records from within the map() method. The same goes for the reducer. An example of how the “pull” style can be useful is processing records in batches, rather than one by one.

  • Configuration has been unified. The old API has a special JobConf object for job configuration, which is an extension of Hadoop’s vanilla Configuration object. In the new API, this distinction is dropped, so job configuration is done through a Configuration.

  • Job control is performed through the Job class, rather than JobClient, which no longer exists in the new API.

Example 2.6 shows the MaxTemperature application rewritten to use the new API. The differences are highlighted in bold.

Example 2.6. Application to find the maximum temperature in the weather dataset using the new context objects MapReduce API

public class NewMaxTemperature {

  

  static class NewMaxTemperatureMapper

    extends Mapper {



    private static final int MISSING = 9999;

    

    public void map(LongWritable key, Text value, Context context)

        throws IOException, InterruptedException {

      

      String line = value.toString();

      String year = line.substring(15, 19);

      int airTemperature;

      if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs

        airTemperature = Integer.parseInt(line.substring(88, 92));

      } else {

        airTemperature = Integer.parseInt(line.substring(87, 92));

      }

      String quality = line.substring(92, 93);

      if (airTemperature != MISSING && quality.matches("[01459]")) {

        context.write(new Text(year), new IntWritable(airTemperature));

      }

    }

  }

  

  static class NewMaxTemperatureReducer

    extends Reducer {

  

    public void reduce(Text key, Iterable values,

        Context context)

        throws IOException, InterruptedException {

      

      int maxValue = Integer.MIN_VALUE;

      for (IntWritable value : values) {

        maxValue = Math.max(maxValue, value.get());

      }

      context.write(key, new IntWritable(maxValue));

    }

  }



  public static void main(String[] args) throws Exception {

    if (args.length != 2) {

      System.err.println("Usage: NewMaxTemperature  ");

      System.exit(-1);

    }

    

    Job job = new Job();

    job.setJarByClass(NewMaxTemperature.class);



    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    

    job.setMapperClass(NewMaxTemperatureMapper.class);

    job.setReducerClass(NewMaxTemperatureReducer.class);



    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

}


[18] At the time of this writing, not all of the MapReduce libraries in Hadoop have been ported to work with the new API. This book uses the old API for this reason. However, a copy of all of the examples in this book, rewritten to use the new API, will be made available on the book’s website.

Hadoop: The Definitive Guide

Learn more about this topic from Hadoop: The Definitive Guide.

Apache Hadoop is ideal for organizations with a growing need to process massive application datasets. Hadoop: The Definitive Guide is a comprehensive resource for using Hadoop to build reliable, scalable, distributed systems. Programmers will find details for analyzing large datasets with Hadoop, and administrators will learn how to set up and run Hadoop clusters. The book includes case studies that illustrate how Hadoop is used to solve specific problems.

See what you'll learn


6 Replies

+ 3
  mvensky's Photo
Posted Jan 29 2011 08:43 AM

I have the book, have Hadoop setup on a RHEL box, and am now trying to get the weather data.

In the book's index it mentions the data as available in Amazon ESB. I don't want to set up an Amazon ECC instance at this time, preferring to download the data directly.

Having gone to the ncdc.noaa.gov website I quickly got lost in the blizzard of datasets. Have you any more guidance as to the precise data sets you are referring to in the text? A more specific url would be of assist.
+ 2
  capo970's Photo
Posted Jun 18 2011 03:08 AM

Thank you for the clarification,i find the book is very helpful.i learned a lot things from this book.
@mvensky:i have the same problem like mvensky,i don't now how can i download the data from Germany,i got a biazzard dataset from this Site:ncdc.noaa.gov.

please help me to get the original data for testing.
Best Regards
walid
0
  Sakthivel Murugasamy's Photo
Posted Jul 09 2011 12:41 AM

The book "Hadoop - The Definitive Guide" is definitely a useful book and is written by an expert Tom White & reviewed by Experts.

Thanks,
Sakthivel
 : Sep 03 2011 02:33 AM
Would someone point my nose to where the data is?
Can't find it on s3, can't find it on NCDC

This sucks big time (
0
  Valon's Photo
Posted Dec 27 2011 10:18 AM

You get a subset of the weather data when you download the book's source code. See the book's site http://www.hadoopbook.com/ and the GitHub link http://github.com/to...te/hadoop-book/
The folder / input / metoffice has weather data.
0
  @ndrest's Photo
Posted Nov 15 2012 09:57 AM

I am a newbie with Hadoop. I am working on a project similar to the example. Weather Data and Hadoop to execute some processing in parallel. The difference is I am working with Cassandra to store the data. The problem I have is that I want to debug my application. Sorry, maybe it's a stupid question, but I can't fund how to do it yet. I am working with Netbeans IDE. I want to run my application step by step to see what is going on into my functions Map and Reduce. Let's say that I want to debug my "Map" and "Reduce" functions and test if they are doing what they are supposed to do. I can only go till the end of the "run" method. When I reach the line
JobClient.runJob(conf); I can not continue debugging any more.

Please any help would be really appreciated.