You can run a MapReduce job with a single line of code:
JobClient.runJob(conf). It’s very short, but it
conceals a great deal of processing behind the scenes. This section
uncovers the steps Hadoop takes to run a job.
The whole process is illustrated in Figure 6.1. At the highest level, there are four independent entities:
The client, which submits the MapReduce job.
The distributed filesystem, which is used for sharing job files between the other entities.
runJob() method on
JobClient is a convenience method that creates a
instance and calls
submitJob() on it (step 1 in
Figure 6.1). Having submitted
runJob() polls the job’s progress once a
second, and reports the progress to the console if it has changed
since the last report. When the job is complete, if it was successful,
the job counters are displayed. Otherwise, the error that caused the
job to fail is logged to the console.
Checks the output specification of the job. For example, if the output directory has not been specified or it already exists, the job is not submitted and an error is thrown to the MapReduce program.
Computes the input splits for the job. If the splits cannot be computed, because the input paths don’t exist, for example, then the job is not submitted and an error is thrown to the MapReduce program.
Copies the resources needed to run the job, including the job JAR file, the configuration file and the computed input splits, to the jobtracker’s filesystem in a directory named after the job ID. The job JAR is copied with a high replication factor (controlled by the
mapred.submit.replicationproperty, which defaults to 10) so that there are lots of copies across the cluster for the tasktrackers to access when they run tasks for the job (step 3).
JobTracker receives a call to its
submitJob() method, it puts it into an internal
queue from where the job scheduler will pick it up and initialize it.
Initialization involves creating an object to represent the job being
run, which encapsulates its tasks, and bookkeeping information to keep
track of the tasks’ status and progress (step 5).
To create the list of tasks to run, the job scheduler
first retrieves the input splits computed by the
JobClient from the shared filesystem (step 6). It
then creates one map task for each split. The number of reduce tasks
to create is determined by the
property in the
JobConf, which is set by the
setNumReduceTasks() method, and the scheduler
simply creates this number of reduce tasks to be run. Tasks are given
IDs at this point.
Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker. Heartbeats tell the jobtracker that a tasktracker is alive, but they also double as a channel for messages. As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task, and if it is, the jobtracker will allocate it a task, which it communicates to the tasktracker using the heartbeat return value (step 7).
Before it can choose a task for the tasktracker, the jobtracker must choose a job to select the task from. There are various scheduling algorithms as explained later in this chapter, but the default one simply maintains a priority list of jobs. Having chosen a job, the jobtracker now chooses a task for the job.
Tasktrackers have a fixed number of slots for map tasks and for reduce tasks: for example, a tasktracker may be able to run two map tasks and two reduce tasks simultaneously. The default scheduler fills empty map task slots before reduce task slots, so if the tasktracker has at least one empty map task slot, the jobtracker will select a map task; otherwise, it will select a reduce task.
To choose a reduce task the jobtracker simply takes the next in its list of yet-to-be-run reduce tasks, since there are no data locality considerations. For a map task, however, it takes account of the tasktracker’s network location and picks a task whose input split is as close as possible to the tasktracker. In the optimal case, the task is data-local, that is, running on the same node that the split resides on. Alternatively, the task may be rack-local: on the same rack, but not the same node, as the split. Some tasks are neither data-local nor rack-local and retrieve their data from a different rack from the one they are running on. You can tell the proportion of each type of task by looking at a job’s counters.
Now the tasktracker has been assigned a task, the next step is
for it to run the task. First, it localizes the job JAR by copying it
from the shared filesystem to the tasktracker’s filesystem. It also
copies any files needed from the distributed cache by the application
to the local disk (step 8).
Second, it creates a local working directory for the task, and un-jars
the contents of the JAR into this directory. Third, it creates an instance of
TaskRunner to run the task.
TaskRunner launches a new Java Virtual
Machine (step 9) to run each task in (step 10), so that any bugs in
the user-defined map and reduce functions don’t affect the tasktracker
(by causing it to crash or hang, for example). It is however possible
to reuse the JVM between tasks.
The child process communicates with its parent through the umbilical interface. This way it informs the parent of the task’s progress every few seconds until the task is complete.
In the case of Streaming, the Streaming task communicates with the process (which may be written in any language) using standard input and output streams. The Pipes task, on the other hand, listens on a socket and passes the C++ process a port number in its environment, so that on startup, the C++ process can establish a persistent socket connection back to the parent Java Pipes task.
In both cases, during execution of the task, the Java process passes input key-value pairs to the external process, which runs it through the user-defined map or reduce function and passes the output key-value pairs back to the Java process. From the tasktracker’s point of view, it is as if the tasktracker child process ran the map or reduce code itself.
MapReduce jobs are long-running batch jobs, taking anything from minutes to hours to run. Because this is a significant length of time, it’s important for the user to get feedback on how the job is progressing. A job and each of its tasks have a status, which includes such things as the state of the job or task (e.g., running, successfully completed, failed), the progress of maps and reduces, the values of the job’s counters, and a status message or description (which may be set by user code). These statuses change over the course of the job, so how do they get communicated back to the client?
When a task is running, it keeps track of its progress, that is, the proportion of the task completed. For map tasks, this is the proportion of the input that has been processed. For reduce tasks, it’s a little more complex, but the system can still estimate the proportion of the reduce input processed. It does this by dividing the total progress into three parts, corresponding to the three phases of the shuffle. For example, if the task has run the reducer on half its input, then the task’s progress is ⅚, since it has completed the copy and sort phases (⅓ each) and is half way through the reduce phase (⅙).
Tasks also have a set of counters that count various events as the task runs, either those built into the framework, such as the number of map output records written, or ones defined by users.
If a task reports progress, it sets a flag to indicate that the status change should be sent to the tasktracker. The flag is checked in a separate thread every three seconds, and if set it notifies the tasktracker of the current task status. Meanwhile, the tasktracker is sending heartbeats to the jobtracker every five seconds (this is a minimum, as the heartbeat interval is actually dependent on the size of the cluster: for larger clusters, the interval is longer), and the status of all the tasks being run by the tasktracker is sent in the call. Counters are sent less frequently than every five seconds, because they can be relatively high-bandwidth.
The jobtracker combines these updates to produce a global view
of the status of all the jobs being run and their constituent tasks.
Finally, as mentioned earlier, the
JobClient receives the latest status by
polling the jobtracker every second. Clients can also use
getJob() method to
RunningJob instance, which contains all of
the status information for the job.
The method calls are illustrated in Figure 6.3.
When the jobtracker receives a notification that the last task
for a job is complete, it changes the status for the job to
“successful.” Then, when the
JobClient polls for
status, it learns that the job has completed successfully, so it
prints a message to tell the user, and then returns from the
Learn more about this topic from Hadoop: The Definitive Guide.
Apache Hadoop is ideal for organizations with a growing need to process massive application datasets. Hadoop: The Definitive Guide is a comprehensive resource for using Hadoop to build reliable, scalable, distributed systems. Programmers will find details for analyzing large datasets with Hadoop, and administrators will learn how to set up and run Hadoop clusters. The book includes case studies that illustrate how Hadoop is used to solve specific problems.