So far in this chapter, you have seen the mechanics of writing a program using MapReduce. We haven’t yet considered how to turn a data processing problem into the MapReduce model.
The data processing you have seen so far in this book is to solve a fairly simple problem (finding the maximum recorded temperature for given years). When the processing gets more complex, this complexity is generally manifested by having more MapReduce jobs, rather than having more complex map and reduce functions. In other words, as a rule of thumb, think about adding more jobs, rather than adding complexity to jobs.
For more complex problems, it is worth considering a higher-level language than MapReduce, such as Pig, Hive, or Cascading. One immediate benefit is that it frees you up from having to do the translation into MapReduce jobs, allowing you to concentrate on the analysis you are performing.
Let’s look at an example of a more complex problem that we want to translate into a MapReduce workflow.
Imagine that we want to find the mean maximum recorded temperature for every day of the year and every weather station. In concrete terms, to calculate the mean maximum daily temperature recorded by station 029070-99999, say, on January 1, we take the mean of the maximum daily temperatures for this station for January 1, 1901; January 1, 1902; and so on up to January 1, 2000.
How can we compute this using MapReduce? The computation decomposes most naturally into two stages:
Compute the maximum daily temperature for every station-date pair.
The MapReduce program in this case is a variant of the maximum temperature program, except that the keys in this case are a composite station-date pair, rather than just the year.
Compute the mean of the maximum daily temperatures for every station-day-month key.
The mapper takes the output from the previous job (station-date, maximum temperature) records, and projects it into (station-day-month, maximum temperature) records by dropping the year component. The reduce function then takes the mean of the maximum temperatures, for each station-day-month key.
The output from first stage looks like this for the station we
are interested in. (The mean_max_daily_temp.sh script in the
examples provides an implementation in Hadoop Streaming.)
029070-99999 19010101 0 029070-99999 19020101 -94 ...
The first two fields form the key, and the final column is the maximum temperature from all the readings for the given station and date. The second stage averages these daily maxima over years to yield:
029070-99999 0101 -68
which is interpreted as saying the mean maximum daily temperature on January 1 for station 029070-99999 over the century is −6.8°C.
It’s possible to do this computation in one MapReduce stage, but it takes more work on the part of the programmer.
The arguments for having more (but simpler) MapReduce stages are that doing so leads to more composable and more maintainable mappers and reducers.
It’s possible to make map and reduce functions even more
composable than we have done. A mapper commonly performs input format
parsing, projection (selecting the relevant fields), and filtering
(removing records that are not of interest). In the mappers you have
seen so far, we have implemented all of these functions in a single
mapper. However, there is a case for splitting these into distinct
mappers and chaining them into a single mapper using the ChainMapper library class that comes with
Hadoop. Combined with a ChainReducer, you can run a chain of
mappers, followed by a reducer and another chain of mappers in a
single MapReduce job.
When there is more than one job in a MapReduce workflow, the question arises: how do you manage the jobs so they are executed in order? There are several approaches, and the main consideration is whether you have a linear chain of jobs, or a more complex directed acyclic graph (DAG) of jobs.
For a linear chain, the simplest approach is to run each job one after another, waiting until a job completes successfully before running the next.
JobClient.runJob(conf1); JobClient.runJob(conf2);
If a job fails, the runJob() method
will throw an IOException, so later
jobs in the pipeline don’t get executed. Depending on your
application, you might want to catch the exception and clean up any
intermediate data that was produced by any previous jobs.
For anything more complex than a linear chain, there are
libraries that can help orchestrate your workflow (although they are
suited to linear chains, or even one-off jobs, too). The simplest is
in the org.apache.hadoop.mapred.jobcontrol package:
the JobControl
class. An instance of JobControl
represents a graph of jobs to be run. You add the job configurations,
then tell the JobControl instance
the dependencies between jobs. You run the JobControl in a thread, and it runs the jobs
in dependency order. You can poll for progress, and when the jobs have
finished, you can query for all the jobs’ statuses, and the associated
errors for any failures. If a job fails, JobControl won’t run its
dependencies.
Unlike JobControl which runs
on the client machine submitting the jobs, the Hadoop
Workflow Scheduler (HWS)[41] runs as a server, and a client submits a workflow to the
scheduler. When the workflow completes the scheduler can make an HTTP
callback to the client to inform it of the jobs’ statuses. HWS can run
different types of jobs in the same workflow: a Pig job followed by a
Java MapReduce job, for example.
[41] At the time of this writing, the Hadoop Workflow Scheduler is still under development. See https://issues.apach...wse/HADOOP-5303.
Learn more about this topic from Hadoop: The Definitive Guide.
Apache Hadoop is ideal for organizations with a growing need to process massive application datasets. Hadoop: The Definitive Guide is a comprehensive resource for using Hadoop to build reliable, scalable, distributed systems. Programmers will find details for analyzing large datasets with Hadoop, and administrators will learn how to set up and run Hadoop clusters. The book includes case studies that illustrate how Hadoop is used to solve specific problems.

Help



