Sunday, April 5, 2015

HDFS Command

1. File System Check (fsck)

Like its disk filesystem cousin, HDFS’s fsck command understands blocks. For example,
running:
% hadoop fsck / -files -blocks
will list the blocks that make up each file in the filesystem.

There are two properties that we set in the pseudodistributed configuration that deserve
further explanation. The first is fs.default.name, set to hdfs://localhost/, which is used
to set a default filesystem for Hadoop. Filesystems are specified by a URI, and here we
have used an hdfs URI to configure Hadoop to use HDFS by default. The HDFS daemons
will use this property to determine the host and port for the HDFS namenode.
We’ll be running it on localhost, on the default HDFS port, 8020. And HDFS clients
will use this property to work out where the namenode is running so they can connect
to it.
We set the second property, dfs.replication, to 1 so that HDFS doesn’t replicate
filesystem blocks by the default factor of three. When running with a single datanode,
HDFS can’t replicate blocks to three datanodes, so it would perpetually warn about
blocks being under-replicated. This setting solves that problem

Why is a block in HDFS so large (Default 64 MB)

Why Is a Block in HDFS So Large?
HDFS blocks are large compared to disk blocks, and the reason is to minimize the cost
of seeks. By making a block large enough, the time to transfer the data from the disk
can be significantly longer than the time to seek to the start of the block. Thus the time
to transfer a large file made of multiple blocks operates at the disk transfer rate.
A quick calculation shows that if the seek time is around 10 ms and the transfer rate is
100 MB/s, to make the seek time 1% of the transfer time, we need to make the block
size around 100 MB. The default is actually 64 MB, although many HDFS installations
use 128 MB blocks. This figure will continue to be revised upward as transfer speeds
grow with new generations of disk drives.
This argument shouldn’t be taken too far, however. Map tasks in MapReduce normally
operate on one block at a time, so if you have too few tasks (fewer than nodes in the
cluster), your jobs will run slower than they could otherwise.

Data Flow

Data Flow

First, some terminology. A MapReduce job is a unit of work that the client wants to be
performed: it consists of the input data, the MapReduce program, and configuration
information. Hadoop runs the job by dividing it into tasks, of which there are two types:
map tasks and reduce tasks.
There are two types of nodes that control the job execution process: a jobtracker and
a number of tasktrackers. The jobtracker coordinates all the jobs run on the system by
scheduling tasks to run on tasktrackers. Tasktrackers run tasks and send progress
reports to the jobtracker, which keeps a record of the overall progress of each job. If a
task fails, the jobtracker can reschedule it on a different tasktracker.
Hadoop divides the input to a MapReduce job into fixed-size pieces called input
splits, or just splits. Hadoop creates one map task for each split, which runs the userdefined
map function for each record in the split.

Having many splits means the time taken to process each split is small compared to the
time to process the whole input. So if we are processing the splits in parallel, the processing
is better load-balanced when the splits are small, since a faster machine will be
able to process proportionally more splits over the course of the job than a slower
machine. Even if the machines are identical, failed processes or other jobs running
concurrently make load balancing desirable, and the quality of the load balancing increases
as the splits become more fine-grained.
On the other hand, if splits are too small, the overhead of managing the splits and of
map task creation begins to dominate the total job execution time. For most jobs, a
good split size tends to be the size of an HDFS block, 64 MB by default, although this
can be changed for the cluster (for all newly created files) or specified when each file is
created.
Hadoop does its best to run the map task on a node where the input data resides in
HDFS. This is called the data locality optimization because it doesn’t use valuable cluster
bandwidth. Sometimes, however, all three nodes hosting the HDFS block replicas
for a map task’s input split are running other map tasks, so the job scheduler will look
for a free map slot on a node in the same rack as one of the blocks. Very occasionally
even this is not possible, so an off-rack node is used, which results in an inter-rack
network transfer. The three possibilities are illustrated in Figure 2-2.


Figure 2-2. Data-local (a), rack-local (b), and off-rack (c) map tasks
It should now be clear why the optimal split size is the same as the block size: it is the
largest size of input that can be guaranteed to be stored on a single node. If the split
spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so
some of the split would have to be transferred across the network to the node running

the map task, which is clearly less efficient than running the whole map task using local
data.
Map tasks write their output to the local disk, not to HDFS. Why is this? Map output
is intermediate output: it’s processed by reduce tasks to produce the final output, and
once the job is complete, the map output can be thrown away. So storing it in HDFS
with replication would be overkill. If the node running the map task fails before the
map output has been consumed by the reduce task, then Hadoop will automatically
rerun the map task on another node to re-create the map output.
Reduce tasks don’t have the advantage of data locality; the input to a single reduce task
is normally the output from all mappers. In the present example, we have a single reduce
task that is fed by all of the map tasks. Therefore, the sorted map outputs have to be
transferred across the network to the node where the reduce task is running, where
they are merged and then passed to the user-defined reduce function. The output of
the reduce is normally stored in HDFS for reliability. As explained in Chapter 3, for
each HDFS block of the reduce output, the first replica is stored on the local node, with
other replicas being stored on off-rack nodes. Thus, writing the reduce output does
consume network bandwidth, but only as much as a normal HDFS write pipeline
consumes.
The whole data flow with a single reduce task is illustrated in Figure 2-3. The dotted
boxes indicate nodes, the light arrows show data transfers on a node, and the heavy
arrows show data transfers between nodes.
Figure 2-3. MapReduce data flow with a single reduce task
32 | Chapter 2: MapReduce
The number of reduce tasks is not governed by the size of the input, but instead is
specified independently. In “The Default MapReduce Job” on page 227, you will see
how to choose the number of reduce tasks for a given job.
When there are multiple reducers, the map tasks partition their output, each creating
one partition for each reduce task. There can be many keys (and their associated values)
in each partition, but the records for any given key are all in a single partition. The
partitioning can be controlled by a user-defined partitioning function, but normally the
default partitioner—which buckets keys using a hash function—works very well.
The data flow for the general case of multiple reduce tasks is illustrated in Figure 2-4.
This diagram makes it clear why the data flow between map and reduce tasks is colloquially
known as “the shuffle,” as each reduce task is fed by many map tasks. The
shuffle is more complicated than this diagram suggests, and tuning it can have a big
impact on job execution time, as you will see in “Shuffle and Sort” on page 208.
Figure 2-4. MapReduce data flow with multiple reduce tasks
Finally, it’s also possible to have zero reduce tasks. This can be appropriate when you
don’t need the shuffle because the processing can be carried out entirely in parallel (a
few examples are discussed in “NLineInputFormat” on page 247). In this case, the
only off-node data transfer is when the map tasks write to HDFS (see Figure 2-5).

Saturday, April 4, 2015

Difference between Old and New API

There are several notable differences between the two APIs:
 • The new API favors abstract classes over interfaces, since these are easier to evolve.
   This means that you can add a method (with a default implementation) to an
   abstract class without breaking old implementations of the class.1 For example,
   the Mapper and Reducer interfaces in the old API are abstract classes in the new API.
• The new API is in the org.apache.hadoop.mapreduce package (and subpackages).
  The old API can still be found in org.apache.hadoop.mapred.
• The new API makes extensive use of context objects that allow the user code to
  communicate with the MapReduce system. The new Context, for example, essen-
  tially unifies the role of the JobConf, the OutputCollector, and the Reporter from
  the old API.
• In both APIs, key-value record pairs are pushed to the mapper and reducer, but in
  addition, the new API allows both mappers and reducers to control the execution
  flow by overriding the run() method. For example, records can be processed in
  batches, or the execution can be terminated before all the records have been pro-
  cessed. In the old API this is possible for mappers by writing a MapRunnable, but no
  equivalent exists for reducers.
• Job control is performed through the Job class in the new API, rather than the old
  JobClient, which no longer exists in the new API.
• Configuration has been unified. The old API has a special JobConf object for job
  configuration, which is an extension of Hadoop’s vanilla Configuration object
  (used for configuring daemons; see “The Configuration API” on page 144). In the
  new API, job configuration is done through a Configuration, possibly via some of
  the helper methods on Job.
• Output files are named slightly differently: in the old API both map and reduce
  outputs are named part-nnnnn, whereas in the new API map outputs are named
  part-m-nnnnn, and reduce outputs are named part-r-nnnnn (where nnnnn is an integer
  designating the part number, starting from zero).
• User-overridable methods in the new API are declared to throw java.lang.Inter
  ruptedException. This means that you can write your code to be responsive to
  interrupts so that the framework can gracefully cancel long-running operations if
  it needs to.2
• In the new API, the reduce() method passes values as a java.lang.Iterable, rather
  than a java.lang.Iterator (as the old API does). This change makes it easier to
  iterate over the values using Java’s for-each loop construct:
       for (VALUEIN value : values) { ... }

Map and Reduce

Map and Reduce

MapReduce works by breaking the processing into two phases: the map phase and the reduce phase. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions: the map function and the reduce function.

Three Problems which a framework like hadoop solves

Three problems which a framework like hadoop solve (Or why go with hadoop):

1. First, dividing the work into equal-size pieces isn’t always easy or obvious. In this case,
the file size for different years varies widely, so some processes will finish much earlier
than others. Even if they pick up further work, the whole run is dominated by the
longest file. A better approach, although one that requires more work, is to split the
input into fixed-size chunks and assign each chunk to a process.

2.Second, combining the results from independent processes may need further process-
ing. In this case, the result for each year is independent of other years and may be
combined by concatenating all the results and sorting by year. If using the fixed-size
chunk approach, the combination is more delicate. For this example, data for a par-
ticular year will typically be split into several chunks, each processed independently.
We’ll end up with the maximum temperature for each chunk, so the final step is to
look for the highest of these maximums for each year.

3.Third, you are still limited by the processing capacity of a single machine. If the best
time you can achieve is 20 minutes with the number of processors you have, then that’s
it. You can’t make it go faster. Also, some datasets grow beyond the capacity of a single
machine. When we start using multiple machines, a whole host of other factors come
into play, mainly falling into the category of coordination and reliability. Who runs the
overall job? How do we deal with failed processes?

So, although it’s feasible to parallelize the processing, in practice it’s messy. Using a framework like Hadoop to take care of these issues is a great help.