Introduction to MapReduce Workflows

0

Posted Oct 28 2009 11:47 AM

So far in this chapter, you have seen the mechanics of writing a program using MapReduce. We haven’t yet considered how to turn a data processing problem into the MapReduce model.

The data processing you have seen so far in this book is to solve a fairly simple problem (finding the maximum recorded temperature for given years). When the processing gets more complex, this complexity is generally manifested by having more MapReduce jobs, rather than having more complex map and reduce functions. In other words, as a rule of thumb, think about adding more jobs, rather than adding complexity to jobs.

For more complex problems, it is worth considering a higher-level language than MapReduce, such as Pig, Hive, or Cascading. One immediate benefit is that it frees you up from having to do the translation into MapReduce jobs, allowing you to concentrate on the analysis you are performing.

Decomposing a Problem into MapReduce Jobs

Let’s look at an example of a more complex problem that we want to translate into a MapReduce workflow.

Imagine that we want to find the mean maximum recorded temperature for every day of the year and every weather station. In concrete terms, to calculate the mean maximum daily temperature recorded by station 029070-99999, say, on January 1, we take the mean of the maximum daily temperatures for this station for January 1, 1901; January 1, 1902; and so on up to January 1, 2000.

How can we compute this using MapReduce? The computation decomposes most naturally into two stages:

1. Compute the maximum daily temperature for every station-date pair.

The MapReduce program in this case is a variant of the maximum temperature program, except that the keys in this case are a composite station-date pair, rather than just the year.

2. Compute the mean of the maximum daily temperatures for every station-day-month key.

The mapper takes the output from the previous job (station-date, maximum temperature) records, and projects it into (station-day-month, maximum temperature) records by dropping the year component. The reduce function then takes the mean of the maximum temperatures, for each station-day-month key.

The output from first stage looks like this for the station we are interested in. (The `mean_max_daily_temp.sh` script in the examples provides an implementation in Hadoop Streaming.)

```029070-99999	19010101	0

029070-99999	19020101	-94

...```

The first two fields form the key, and the final column is the maximum temperature from all the readings for the given station and date. The second stage averages these daily maxima over years to yield:

`029070-99999	0101	-68`

which is interpreted as saying the mean maximum daily temperature on January 1 for station 029070-99999 over the century is −6.8°C.

It’s possible to do this computation in one MapReduce stage, but it takes more work on the part of the programmer.

The arguments for having more (but simpler) MapReduce stages are that doing so leads to more composable and more maintainable mappers and reducers.

It’s possible to make map and reduce functions even more composable than we have done. A mapper commonly performs input format parsing, projection (selecting the relevant fields), and filtering (removing records that are not of interest). In the mappers you have seen so far, we have implemented all of these functions in a single mapper. However, there is a case for splitting these into distinct mappers and chaining them into a single mapper using the `ChainMapper` library class that comes with Hadoop. Combined with a `ChainReducer`, you can run a chain of mappers, followed by a reducer and another chain of mappers in a single MapReduce job.

Running Dependent Jobs

When there is more than one job in a MapReduce workflow, the question arises: how do you manage the jobs so they are executed in order? There are several approaches, and the main consideration is whether you have a linear chain of jobs, or a more complex directed acyclic graph (DAG) of jobs.

For a linear chain, the simplest approach is to run each job one after another, waiting until a job completes successfully before running the next.

```JobClient.runJob(conf1);

JobClient.runJob(conf2);```

If a job fails, the `runJob()` method will throw an `IOException`, so later jobs in the pipeline don’t get executed. Depending on your application, you might want to catch the exception and clean up any intermediate data that was produced by any previous jobs.

For anything more complex than a linear chain, there are libraries that can help orchestrate your workflow (although they are suited to linear chains, or even one-off jobs, too). The simplest is in the `org.apache.hadoop.``mapred.jobcontrol` package: the `JobControl` class. An instance of `JobControl` represents a graph of jobs to be run. You add the job configurations, then tell the `JobControl` instance the dependencies between jobs. You run the `JobControl` in a thread, and it runs the jobs in dependency order. You can poll for progress, and when the jobs have finished, you can query for all the jobs’ statuses, and the associated errors for any failures. If a job fails, `JobControl` won’t run its dependencies.

Unlike `JobControl` which runs on the client machine submitting the jobs, the Hadoop Workflow Scheduler (HWS)[41] runs as a server, and a client submits a workflow to the scheduler. When the workflow completes the scheduler can make an HTTP callback to the client to inform it of the jobs’ statuses. HWS can run different types of jobs in the same workflow: a Pig job followed by a Java MapReduce job, for example.

[41] At the time of this writing, the Hadoop Workflow Scheduler is still under development. See https://issues.apach...wse/HADOOP-5303.

Tags:
0