To take advantage of the parallel processing that Hadoop provides, we need to express our query as a MapReduce job. After some local, small-scale testing, we will be able to run it on a cluster of machines.
MapReduce works by breaking the processing into two phases: the map phase and the reduce phase. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions: the map function and the reduce function.
The input to our map phase is the raw NCDC data. We choose a text input format that gives us each line in the dataset as a text value. The key is the offset of the beginning of the line from the beginning of the file, but as we have no need for this, we ignore it.
Our map function is simple. We pull out the year and the air temperature, since these are the only fields we are interested in. In this case, the map function is just a data preparation phase, setting up the data in such a way that the reducer function can do its work on it: finding the maximum temperature for each year. The map function is also a good place to drop bad records: here we filter out temperatures that are missing, suspect, or erroneous.
To visualize the way the map works, consider the following sample lines of input data (some unused columns have been dropped to fit the page, indicated by ellipses):
0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999...
These lines are presented to the map function as the key-value pairs:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...) (106, 0043011990999991950051512004...9999999N9+00221+99999999999...) (212, 0043011990999991950051518004...9999999N9-00111+99999999999...) (318, 0043012650999991949032412004...0500001N9+01111+99999999999...) (424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
The keys are the line offsets within the file, which we ignore in our map function. The map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output. (The temperature values have been interpreted as integers.)
(1950, 0) (1950, 22) (1950, −11) (1949, 111) (1949, 78)
The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key. So, continuing the example, our reduce function sees the following input:
(1949, [111, 78]) (1950, [0, 22, −11])
Each year appears with a list of all its air temperature readings. All the reduce function has to do now is iterate through the list and pick up the maximum reading:
(1949, 111) (1950, 22)
This is the final output: the maximum global temperature recorded in each year.
The whole data flow is illustrated in Figure 2.1 At the bottom of the diagram is a Unix pipeline, which mimics the whole MapReduce flow, and which we will see again later in the chapter when we look at Hadoop Streaming.
Having run through how the MapReduce program works, the next
step is to express it in code. We need three things: a map function, a
reduce function, and some code to run the job. The map function is
represented by an implementation of the Mapper interface, which declares a map() method. Example 2.3 shows the implementation of our map
function.
Example 2.3. Mapper for maximum temperature example
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureMapper extends MapReduceBase implements Mapper{ private static final int MISSING = 9999; public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { output.collect(new Text(year), new IntWritable(airTemperature)); } } }
The Mapper interface is a
generic type, with four formal type parameters that specify the input
key, input value, output key, and output value types of the map
function. For the present example, the input key is a long integer
offset, the input value is a line of text, the output key is a year,
and the output value is an air temperature (an integer). Rather than
use built-in Java types, Hadoop provides its own set of basic types
that are optimized for network serialization. These are found in the
org.apache.hadoop.io package. Here
we use LongWritable, which
corresponds to a Java Long,
Text (like Java String), and IntWritable (like Java Integer).
The map() method is passed a
key and a value. We convert the Text value containing the line of input into
a Java String, then use its
substring() method to extract the
columns we are interested in.
The map() method also
provides an instance of OutputCollector to write the output to. In
this case, we write the year as a Text object (since we are just using it as a
key), and the temperature wrapped in an IntWritable. We write an output record only
if the temperature is present and the quality code indicates the
temperature reading is OK.
The reduce function is similarly defined using a Reducer, as illustrated in Example 2.4.
Example 2.4. Reducer for maximum temperature example
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureReducer extends MapReduceBase implements Reducer{ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } }
Again, four formal type parameters are used to specify the input
and output types, this time for the reduce function. The input types
of the reduce function must match the output type of the map function:
Text and IntWritable. And in this case, the output
types of the reduce function are Text and IntWritable, for a year and its maximum
temperature, which we find by iterating through the temperatures and
comparing each with a record of the highest found so far.
The third piece of code runs the MapReduce job (see Example 2.5).
Example 2.5. Application to find the maximum temperature in the weather dataset
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature A JobConf object forms the
specification of the job. It gives you control over how the job is
run. When we run this job on a Hadoop cluster, we will package the
code into a JAR file (which Hadoop will distribute round the cluster).
Rather than explicitly specify the name of the JAR file, we can pass a
class in the JobConf constructor,
which Hadoop will use to locate the relevant JAR file by looking for
the JAR file containing this class.
Having constructed a JobConf
object, we specify the input and output paths. An input path is
specified by calling the static addInputPath() method on FileInputFormat, and it can be a single
file, a directory (in which case, the input forms all the files in
that directory), or a file pattern. As the name suggests, addInputPath() can be called more than once
to use input from multiple paths.
The output path (of which there is only one) is specified by the
static setOutputPath() method on
FileOutputFormat. It specifies a
directory where the output files from the reducer functions are
written. The directory shouldn’t exist before running the job, as
Hadoop will complain and not run the job. This precaution is to
prevent data loss (it can be very annoying to accidentally overwrite
the output of a long job with another).
Next, we specify the map and reduce types to use via the
setMapperClass() and setReducerClass() methods.
The setOutputKeyClass() and
setOutputValueClass() methods
control the output types for the map and the reduce functions, which
are often the same, as they are in our case. If they are different,
then the map output types can be set using the methods setMapOutputKeyClass()
and setMapOutputValueClass().
The input types are controlled via the input format, which we
have not explicitly set since we are using the default TextInputFormat.
After setting the classes that define the map and reduce
functions, we are ready to run the job. The static runJob() method on JobClient submits the job and waits for it
to finish, writing information about its progress to the
console.
After writing a MapReduce job, it’s normal to try it out on a small dataset to flush out any immediate problems with the code. First install Hadoop in standalone mode. This is the mode in which Hadoop runs using the local filesystem with a local job runner. Let’s test it on the five-line sample discussed earlier (the output has been slightly reformatted to fit the page):
%export HADOOP_CLASSPATH=build/classes%hadoop MaxTemperature input/ncdc/sample.txt output09/04/07 12:34:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=Job Tracker, sessionId= 09/04/07 12:34:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 09/04/07 12:34:35 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process : 1 09/04/07 12:34:35 INFO mapred.JobClient: Running job: job_local_0001 09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process : 1 09/04/07 12:34:35 INFO mapred.MapTask: numReduceTasks: 1 09/04/07 12:34:35 INFO mapred.MapTask: io.sort.mb = 100 09/04/07 12:34:35 INFO mapred.MapTask: data buffer = 79691776/99614720 09/04/07 12:34:35 INFO mapred.MapTask: record buffer = 262144/327680 09/04/07 12:34:35 INFO mapred.MapTask: Starting flush of map output 09/04/07 12:34:36 INFO mapred.MapTask: Finished spill 0 09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 09/04/07 12:34:36 INFO mapred.LocalJobRunner: file:/Users/tom/workspace/htdg/input/n cdc/sample.txt:0+529 09/04/07 12:34:36 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done. 09/04/07 12:34:36 INFO mapred.LocalJobRunner: 09/04/07 12:34:36 INFO mapred.Merger: Merging 1 sorted segments 09/04/07 12:34:36 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 57 bytes 09/04/07 12:34:36 INFO mapred.LocalJobRunner: 09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done . And is in the process of commiting 09/04/07 12:34:36 INFO mapred.LocalJobRunner: 09/04/07 12:34:36 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 09/04/07 12:34:36 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/tom/workspace/htdg/output 09/04/07 12:34:36 INFO mapred.LocalJobRunner: reduce > reduce 09/04/07 12:34:36 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done. 09/04/07 12:34:36 INFO mapred.JobClient: map 100% reduce 100% 09/04/07 12:34:36 INFO mapred.JobClient: Job complete: job_local_0001 09/04/07 12:34:36 INFO mapred.JobClient: Counters: 13 09/04/07 12:34:36 INFO mapred.JobClient: FileSystemCounters 09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_READ=27571 09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=53907 09/04/07 12:34:36 INFO mapred.JobClient: Map-Reduce Framework 09/04/07 12:34:36 INFO mapred.JobClient: Reduce input groups=2 09/04/07 12:34:36 INFO mapred.JobClient: Combine output records=0 09/04/07 12:34:36 INFO mapred.JobClient: Map input records=5 09/04/07 12:34:36 INFO mapred.JobClient: Reduce shuffle bytes=0 09/04/07 12:34:36 INFO mapred.JobClient: Reduce output records=2 09/04/07 12:34:36 INFO mapred.JobClient: Spilled Records=10 09/04/07 12:34:36 INFO mapred.JobClient: Map output bytes=45 09/04/07 12:34:36 INFO mapred.JobClient: Map input bytes=529 09/04/07 12:34:36 INFO mapred.JobClient: Combine input records=0 09/04/07 12:34:36 INFO mapred.JobClient: Map output records=5 09/04/07 12:34:36 INFO mapred.JobClient: Reduce input records=5
When the hadoop command is invoked with a
classname as the first argument, it launches a JVM to run the class.
It is more convenient to use hadoop than straight
java since the former adds the Hadoop libraries
(and their dependencies) to the classpath, and picks up the Hadoop
configuration too. To add the application classes to the classpath,
we’ve defined an environment variable called HADOOP_CLASSPATH, which the
hadoop script picks up.
Note
When running in local (standalone) mode, the programs in
this book all assume that you have set the HADOOP_CLASSPATH in this way. The
commands should be run from the directory that the example code is
installed in.
The output from running the job provides some useful
information. (The warning about the job JAR file not being found is
expected, since we are running in local mode without a JAR. We won’t
see this warning when we run on a cluster.) For example, we can see
that the job was given an ID of job_local_0001, and it ran one map task
and one reduce task (with the IDs attempt_local_0001_m_000000_0 and attempt_local_0001_r_000000_0). Knowing
the job and task IDs can be very useful when debugging MapReduce
jobs.
The last section of the output, entitled “Counters,” shows the statistics that Hadoop generates for each job it runs. These are very useful for checking whether the amount of data processed is what you expected. For example, we can follow the number of records that went through the system: five map inputs produced five map outputs, then five reduce inputs in two groups produced two reduce outputs.
The output was written to the output directory, which contains one
output file per reducer. The job had a single reducer, so we find a
single file, named part-00000:
%cat output/part-000001949 111 1950 22
This result is the same as when we went through it by hand earlier. We interpret this as saying that the maximum temperature recorded in 1949 was 11.1°C, and in 1950 it was 2.2°C.
Release 0.20.0 of Hadoop included a new Java MapReduce API, sometimes referred to as “Context Objects,” designed to make the API easier to evolve in the future. The new API is type-incompatible with the old, however, so applications need to be rewritten to take advantage of it.[18]
There are several notable differences between the two APIs:
The new API favors abstract classes over interfaces, since these are easier to evolve. For example, you can add a method (with a default implementation) to an abstract class without breaking old implementations of the class. In the new API, the
MapperandReducerinterfaces are now abstract classes.The new API is in the
org.apache.hadoop.mapreducepackage (and subpackages). The old API is found inorg.apache.hadoop.mapred.The new API makes extensive use of context objects that allow the user code to communicate with the MapReduce system. The
MapContext, for example, essentially unifies the role of theJobConf, theOutputCollector, and theReporter.The new API supports both a “push” and a “pull” style of iteration. In both APIs, key-value record pairs are pushed to the mapper, but in addition, the new API allows a mapper to pull records from within the
map()method. The same goes for the reducer. An example of how the “pull” style can be useful is processing records in batches, rather than one by one.-
Configuration has been unified. The old API has a special
JobConfobject for job configuration, which is an extension of Hadoop’s vanillaConfigurationobject. In the new API, this distinction is dropped, so job configuration is done through aConfiguration. Job control is performed through the
Jobclass, rather thanJobClient, which no longer exists in the new API.
Example 2.6 shows the MaxTemperature application rewritten to
use the new API. The differences are highlighted in bold.
Example 2.6. Application to find the maximum temperature in the weather dataset using the new context objects MapReduce API
public class NewMaxTemperature {
static class NewMaxTemperatureMapper
extends Mapper {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
static class NewMaxTemperatureReducer
extends Reducer {
public void reduce(Text key, Iterable values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: NewMaxTemperature [18] At the time of this writing, not all of the MapReduce libraries in Hadoop have been ported to work with the new API. This book uses the old API for this reason. However, a copy of all of the examples in this book, rewritten to use the new API, will be made available on the book’s website.
Learn more about this topic from Hadoop: The Definitive Guide.
Apache Hadoop is ideal for organizations with a growing need to process massive application datasets. Hadoop: The Definitive Guide is a comprehensive resource for using Hadoop to build reliable, scalable, distributed systems. Programmers will find details for analyzing large datasets with Hadoop, and administrators will learn how to set up and run Hadoop clusters. The book includes case studies that illustrate how Hadoop is used to solve specific problems.

Help



