8.2 Does Hadoop replace databases or other existing systems?
No.
Hadoop is not a database nor does it need to replace any existing data systems you may have. Hadoop is a massively scalable storage and batch data processing system. It provides an integrated storage and processing fabric that scales horizontally with commodity hardware and provides fault tolerance through software. Rather than replace existing systems, Hadoop augments them by offloading the particularly difficult problem of simultaneously ingesting, processing and delivering/exporting large volumes of data so existing systems can focus on what they were designed to do: whether that be serve real time transactional data or provide interactive business intelligence. Furthermore, Hadoop can absorb any type of data, structured or not, from any number of sources. Data from many sources can be joined and aggregated in arbitrary ways enabling deeper analyses than any one system can provide. Lastly, these results can be delivered to any existing enterprise system for further use independent of Hadoop.
For example, consider an RDBMS used for serving real-time data and ensuring transactional consistency. Asking that same database to generate complex analytical reports over large volumes of data is performance intensive and detracts from its ability to do what it does well. Hadoop is designed to store vast volumes of data, process that data in arbitrary ways, and deliver that data to wherever it is needed. By regularly exporting data from an RDBMS, you can tune your database for its interactive workload and use Hadoop to conduct arbitrarily complex analysis offline without impacting your real-time systems.
8.3 How does MapReduce relate to Hadoop and other systems?
Hadoop is an open source implementation of both the MapReduce programming model, and the underlying file system Google developed to support web scale data.
The MapReduce programming model was designed by Google to enable a clean abstraction between large scale data analysis tasks and the underlying systems challenges involved in ensuring reliable large-scale computation. By adhering to the MapReduce model, your data processing job can be easily parallelized and the programmer doesn’t have to think about the system level details of synchronization, concurrency, hardware failure, etc.
What limits the scalability of any MapReduce implementation is the underlying system storing your data and feeding it to MapReduce.
Google chose not to use a RDBMS for this because the overhead incurred from maintaining indexes, relationships and transactional guarantees isn’t necessary for batch processing. Rather, they designed a new distributed file system, from the ground up, that worked well with “shared nothing” architecture principles. They purposefully abandoned indexes, relationships and transactional guarantees because they limit horizontal scalability and slow down loading and batch data processing for semi and unstructured data.
Some RDBMSs provide MapReduce functionality. In doing so, they provide an easy way for programmers to create more expressive queries than SQL allows in a way that does not induce any additional scalability constraints on the database. However, MapReduce alone does not address the fundamental challenge of scaling out a RDBMS in a horizontal manner.
If you need indexes, relationships and transactional guarantees, you need a database. If you need a database, one that supports MapReduce will allow for more expressive analysis than one that does not.
However, if your primary need is a highly scalable storage and batch data processing system, you will often find that Hadoop utilizes commodity resources effectively to deliver a lower cost per TB for data storage and processing.
8.4 How do existing systems interact with Hadoop?
Hadoop often serves as a sink for many sources of data because Hadoop allows you to store data cost effectively and process that data in arbitrary ways at a later time. Because Hadoop doesn’t maintain indexes or relationships, you don’t need to decide how you want analyze your data in advance. Let’s look at how various systems get data into Hadoop.
Databases: Hadoop has native support for extracting data over JDBC. Many databases also have bulk dump / load functionality. In either case, depending on the type of data, it is easy to dump the entire database to Hadoop on a regular basis, or just export the updates since the last dump. Often, you will find that by dumping data to Hadoop regularly, you can store less data in your interactive systems and lower your licensing costs going forward.
Log Generators: Many systems, from web servers to sensors, generate log data and some do so at astonishing rates. These log records often have a semi-structured nature and change over time. Analyzing these logs is often difficult because they don’t “fit” nicely in relational databases and take too long to process on a single machine. Hadoop makes it easy for any number of systems to reliably stream any volume of logs to a central repository for later analysis. You often see users dump each day’s logs into a new directory so they can easily run analysis over any arbitrary time-frame’s worth of logs.
Scientific Equipment: As sensor technology improves, we see many scientific devices ranging from imagery systems (medical, satellite, etc) to DNA sequencers to high energy physics detectors, generating data at rates that vastly exceed both the write speed and capacity of a single disk. These systems can write data directly to Hadoop and as the data generation rate and processing demands increase, they can be addressed simply adding more commodity hardware to your Hadoop cluster.
Hadoop is agnostic to what type of data you store. It breaks data into manageable chunks, replicates them, and distributes multiple copies across all the nodes in your cluster so you can process your data quickly and reliably later. You can now conduct analysis that consumes all of your data. These aggregates, summaries, and reports can be exported to any other system using raw files, JDBC, or custom connectors.
8.5 How can users across an organization interact with Hadoop?
One of the great things about Hadoop is that it exposes massive amounts of data to a diverse set of users across your organization. It creates and drives a culture of data, which in turn empowers individuals at every level to make better business decisions.
When a DBA designs and optimizes a database, she considers many factors. First and foremost is the structure of the data, the access patterns for the data, and the views / reports required of the data. These early decisions limit the types of queries the database can respond to efficiently. As business users request more views on the data, it becomes a constant struggle to maintain performance and deliver new reports in timely manner. Hadoop enables a DBA to optimize the database for its primary workload, and regularly export the data for analytical purposes.
Once data previously locked up in database systems is available for easy processing, programmers can transform this data in any number of ways. They can craft more expressive queries and generate more data / cpu intensive reports without impacting production database performance. They can build pipelines that leverage data from many sources for research, development, and business processes. Programmers may find our online Hadoop training useful, especially Programming with Hadoop.
But working closely with data doesn’t stop with DBAs and programmers. By providing simple high-level interfaces, Hadoop enables less technical users to ask quick, ad-hoc questions about any data in the enterprise. This enables everyone from product managers, to analysts, to executives to participate in, and drive a culture focused on data. For more details, check out our online training for Hive (a data warehouse for Hadoop with an SQL interface) and Pig (a high level language for ad-hoc analysis).
8.6 How do I understand and predict the cost of running Hadoop?
One of the nice things about Hadoop is that understanding your costs in advance is relatively simple. Hadoop is free software and it runs on commodity hardware, which includes cloud providers like Amazon. It has been demonstrated to scale beyond tens of petabytes (PB). More importantly, it does so with linear performance characteristics and cost.
Hadoop uses commodity hardware, so every month, your costs decrease or provide more capacity for the same price point. You probably have a vendor you like, and they probably sell a dual quad-core (8 cores total) machine with 4 1TB SATA disks (you specifically don’t want RAID). Your budget and workload will decide whether you want 8 or 16GB of RAM per machine. Hadoop uses thee-way replication for data durability, so your “usable” capacity will roughly be your raw disk capacity divided by 3. For machines with 4×1TB disks, 1 TB is a good estimate for usable space, as it leaves some overhead for intermediate data and the like. It also makes the math really easy. If you use EC2, two extra large instances provide about 1 usable TB.
Armed with your initial data size, the growth rate of that data, and the cost per usable TB, you should be able to estimate the cost of your Hadoop cluster. You will also incur operational costs, but, because the software is common across all the machines, and requires little per-machine tuning, the operational cost scales sub-linearly.
To make things even easier, Cloudera provides our distribution for both local deployments and EC2 free of charge under the Apache 2.0 license. You can learn more at http://www.cloudera.com/hadoop
Monday, August 16, 2010
Fundamental Assumptions for Hadoop
When Google began ingesting and processing the entire web on a regular basis, no existing system was up for the task. Managing and processing data at this scale was simply never considered before.
To address the massive scale of data first introduced by the web, but now commonplace in many industries, Google built systems from the ground up to reliably store and process petabytes of
Assumption 1: Hardware can be reliable.
It is true that you can pay a significant premium for hardware with a mean time to failure (MTTF) that exceeds its expected lifespan. However, working with web scale data requires thousands of disks and servers. Even an MTTF of 4 years results in nearly 5 failures per week in a cluster of 1,000 nodes. For a fraction of the cost, using commodity hardware with an MTTF of 2 years, you can expect just shy of 10 failures per week. In the big scheme of things, these scenarios are nearly identical, and both require fundamentally rethinking fault tolerance. In order to provide reliable storage and computation at scale, fault tolerance must be provided through software. When this is achieved, the economics of “reliable” hardware quickly fall apart.
Assumption 2: Machines have identities.
Once you accept that all machines will eventually fail, you need to stop thinking of them as individuals with identities, or you will quickly find yourself trying to identify a machine which no longer exists. It is obvious that if you want to leverage many machines to accomplish a task, they must be able to communicate with each other. This is true, but to deal effectively with the reality of unreliable hardware, communication must be implicit. It must not depend on machine X sending some data Y to machine Z, but rather some machine saying that some other machine must process some data Y. If you maintain explicit communication at scale, you face a verification problem at least as large as your data processing problem. This shift from explicit to implicit communication allows the underlying software system to reliably ensure data is stored and processed without requiring the programmer to verify successful communication, or more importantly, allow them to make mistakes doing so.
Assumption 3: A data set can be stored on a single machine.
When working with large amounts of data, we are quickly confronted with data sets that exceed the capacity of a single disk and are intractable for a single processor. This requires changing our assumptions about how data is stored and processed. A large data set can actually be stored in many pieces across many machines to facilitate parallel computation. If each machine in your cluster stores a small piece of each data set, any machine can process part of any data set by reading from its local disk. When many machines are used in parallel, you can process your entire data set by pushing the computation to the data and thus conserving precious network bandwidth.
To address the massive scale of data first introduced by the web, but now commonplace in many industries, Google built systems from the ground up to reliably store and process petabytes of
Assumption 1: Hardware can be reliable.
It is true that you can pay a significant premium for hardware with a mean time to failure (MTTF) that exceeds its expected lifespan. However, working with web scale data requires thousands of disks and servers. Even an MTTF of 4 years results in nearly 5 failures per week in a cluster of 1,000 nodes. For a fraction of the cost, using commodity hardware with an MTTF of 2 years, you can expect just shy of 10 failures per week. In the big scheme of things, these scenarios are nearly identical, and both require fundamentally rethinking fault tolerance. In order to provide reliable storage and computation at scale, fault tolerance must be provided through software. When this is achieved, the economics of “reliable” hardware quickly fall apart.
Assumption 2: Machines have identities.
Once you accept that all machines will eventually fail, you need to stop thinking of them as individuals with identities, or you will quickly find yourself trying to identify a machine which no longer exists. It is obvious that if you want to leverage many machines to accomplish a task, they must be able to communicate with each other. This is true, but to deal effectively with the reality of unreliable hardware, communication must be implicit. It must not depend on machine X sending some data Y to machine Z, but rather some machine saying that some other machine must process some data Y. If you maintain explicit communication at scale, you face a verification problem at least as large as your data processing problem. This shift from explicit to implicit communication allows the underlying software system to reliably ensure data is stored and processed without requiring the programmer to verify successful communication, or more importantly, allow them to make mistakes doing so.
Assumption 3: A data set can be stored on a single machine.
When working with large amounts of data, we are quickly confronted with data sets that exceed the capacity of a single disk and are intractable for a single processor. This requires changing our assumptions about how data is stored and processed. A large data set can actually be stored in many pieces across many machines to facilitate parallel computation. If each machine in your cluster stores a small piece of each data set, any machine can process part of any data set by reading from its local disk. When many machines are used in parallel, you can process your entire data set by pushing the computation to the data and thus conserving precious network bandwidth.
Hadoop Map Reduce Implementation
Hadoop Map Reduce Implementation
We will describe about the methodology of transforming a sequential algorithm into parallel. After that, we can implement the parallel algorithm, one of the popular framework we can use is the Apache Opensource Hadoop Map/Reduce framework.
7.1 Functional Programming
Multithreading is one of the popular way of doing parallel programming, but major complexity of multi-thread programming is to co-ordinate the access of each thread to the shared data. We need things like semaphores, locks, and also use them with great care, otherwise dead locks will result.
If we can eliminate the shared state completely, then the complexity of co-ordination will disappear. This is the fundamental concept of functional programming. Data is explicitly passed between functions as parameters or return values which can only be changed by the active function at that moment. Imagine functions are connected to each other via a directed acyclic graph. Since there is no hidden dependency (via shared state), functions in the DAG can run anywhere in parallel as long as one is not an ancestor of the other. In other words, analyze the parallelism is much easier when there is no hidden dependency from shared state.
7.2 Map/Reduce Functions
Map/reduce is a special form of such a DAG which is applicable in a wide range of use cases. It is organized as a “map” function which transform a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a “reduce” function is use to merge the values (of the same key) into a single result.
map(input_record) {
…
emit(k1, v1)
…
emit(k2, v2)
…
}
reduce (key, values) {
aggregate = initialize()
while (values.has_next) {
aggregate = merge(values.next)
}
collect(key, aggregate)
The Map/Reduce DAG is organized in this way.
A parallel algorithm is usually structure as multiple rounds of Map/Reduce
7.3 Distributed File Systems
The distributed file system is designed to handle large files (multi-GB) with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.
There is a master “NameNode” to keep track of overall file directory structure and the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed.
To read a file, the client API will calculate the chunk index based on the offset of the file pointer and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From this points, the client contacts the DataNode directly without going through the NameNode.
To write a file, client API will first contact the NameNode who will designate one of the replica as the primary (by granting it a lease). The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNode. After changes are buffered at all DataNodes, the client send a “commit” request to the primary, which determines an order to update and then push this order to all other secondaries. After all secondaries complete the commit, the primary will response to the client about the success.
All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintain an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file.
In case of the NameNode crash, all lease granting operation will fail and so any write operation is effectively fail also. Read operation should continuously to work as long as the clinet program has a handle to the DataNode. To recover from NameNode crash, a new NameNode can take over after restoring the state from the last checkpoint file and replay the operation log.
When a DataNode crashes, it will be detected by the NameNode after missing its hearbeat for a while. The NameNode removes the crashed DataNode from the cluster and spread its chunks to other surviving DataNodes. This way, the replication factor of each chunk will be maintained across the cluster.
Later when the DataNode recover and rejoin the cluster, it reports all its chunks to the NameNode at boot time. Each chunk has a version number which will advanced at each update. Therefore, the NameNode can easily figure out if any of the chunks of a DataNode becomes stale. Those stale chunks will be garbage collected at a later time.
7.4 Job Execution
Hadoop MapRed is based on a “pull” model where multiple “TaskTrackers” poll the “JobTracker” for tasks (either map task or reduce task).
The job execution starts when the client program uploading three files: “job.xml” (the job config including map, combine, reduce function and input/output data path, etc.), “job.split” (specifies how many splits and range based on dividing files into ~16 – 64 MB size), “job.jar” (the actual Mapper and Reducer implementation classes) to the HDFS location (specified by the “mapred.system.dir” property in the “hadoop-default.conf” file). Then the client program notifies the JobTracker about the Job submission. The JobTracker returns a Job id to the client program and starts allocating map tasks to the idle TaskTrackers when they poll for tasks.
Each TaskTracker has a defined number of "task slots" based on the capacity of the machine. There are heartbeat protocol allows the JobTracker to know how many free slots from each TaskTracker. The JobTracker will determine appropriate jobs for the TaskTrackers based on how busy thay are, their network proximity to the data sources (preferring same node, then same rack, then same network switch). The assigned TaskTrackers will fork a MapTask (separate JVM process) to execute the map phase processing. The MapTask extracts the input data from the splits by using the “RecordReader” and “InputFormat” and it invokes the user provided “map” function which emits a number of key/value pair in the memory buffer.
When the buffer is full, the output collector will spill the memory buffer into disk. For optimizing the network bandwidth, an optional “combine” function can be invoked to partially reduce values of each key. Afterwards, the “partition” function is invoked on each key to calculate its reducer node index. The memory buffer is eventually flushed into 2 files, the first index file contains an offset pointer of each partition. The second data file contains all records sorted by partition and then by key.
When the map task has finished executing all input records, it start the commit process, it first flush the in-memory buffer (even it is not full) to the index + data file pair. Then a merge sort for all index + data file pairs will be performed to create a single index + data file pair.
The index + data file pair will then be splitted into are R local directories, one for each partition. After all the MapTask completes (all splits are done), the TaskTracker will notify the JobTracker which keeps track of the overall progress of job. JobTracker also provide a web interface for viewing the job status.
When the JobTracker notices that some map tasks are completed, it will start allocating reduce tasks to subsequent polling TaskTrackers (there are R TaskTrackers will be allocated for reduce task). These allocated TaskTrackers remotely download the region files (according to the assigned reducer index) from the completed map phase nodes and concatenate (merge sort) them into a single file. Whenever more map tasks are completed afterwards, JobTracker will notify these allocated TaskTrackers to download more region files (merge with previous file). In this manner, downloading region files are interleaved with the map task progress. The reduce phase is not started at this moment yet.
Eventually all the map tasks are completed. The JobTracker then notifies all the allocated TaskTrackers to proceed to the reduce phase. Each allocated TaskTracker will fork a ReduceTask (separate JVM) to read the downloaded file (which is already sorted by key) and invoke the “reduce” function, which collects the key/aggregatedValue into the final output file (one per reducer node). Note that each reduce task (and map task as well) is single-threaded. And this thread will invoke the reduce(key, values) function in assending (or descending) order of the keys assigned to this reduce task. This provides an interesting property that all entries written by the reduce() function is sorted in increasing order. The output of each reducer is written to a temp output file in HDFS. When the reducer finishes processing all keys, the temp output file will be renamed atomically to its final output filename.
The Map/Reduce framework is resilient to crashes of any components. TaskTracker nodes periodically report their status to the JobTracker which keeps track of the overall job progress. If the JobTracker hasn’t heard from any TaskTracker nodes for a long time, it assumes the TaskTracker node has been crashed and will reassign its tasks appropriately to other TaskTracker nodes. Since the map phase result is stored in the local disk, which will not be available when the TaskTracker node crashes. In case a map-phase TaskTracker node crashes, the crashed MapTasks (regardless of whether it is complete or not) will be reassigned to a different TaskTracker node, which will rerun all the assigned splits. However, the reduce phase result is stored in HDFS, which is available even the TaskTracker node crashes. Therefore, in case a reduce-phase TaskTracker node crashes, only the incomplete ReduceTasks need to be reassigned to a different TaskTracker node, where the incompleted reduce tasks will be re-run.
The job submission process is asynchronous. Client program can poll for the job status at any time by supplying the job id.
We will describe about the methodology of transforming a sequential algorithm into parallel. After that, we can implement the parallel algorithm, one of the popular framework we can use is the Apache Opensource Hadoop Map/Reduce framework.
7.1 Functional Programming
Multithreading is one of the popular way of doing parallel programming, but major complexity of multi-thread programming is to co-ordinate the access of each thread to the shared data. We need things like semaphores, locks, and also use them with great care, otherwise dead locks will result.
If we can eliminate the shared state completely, then the complexity of co-ordination will disappear. This is the fundamental concept of functional programming. Data is explicitly passed between functions as parameters or return values which can only be changed by the active function at that moment. Imagine functions are connected to each other via a directed acyclic graph. Since there is no hidden dependency (via shared state), functions in the DAG can run anywhere in parallel as long as one is not an ancestor of the other. In other words, analyze the parallelism is much easier when there is no hidden dependency from shared state.
7.2 Map/Reduce Functions
Map/reduce is a special form of such a DAG which is applicable in a wide range of use cases. It is organized as a “map” function which transform a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a “reduce” function is use to merge the values (of the same key) into a single result.
map(input_record) {
…
emit(k1, v1)
…
emit(k2, v2)
…
}
reduce (key, values) {
aggregate = initialize()
while (values.has_next) {
aggregate = merge(values.next)
}
collect(key, aggregate)
The Map/Reduce DAG is organized in this way.
A parallel algorithm is usually structure as multiple rounds of Map/Reduce
7.3 Distributed File Systems
The distributed file system is designed to handle large files (multi-GB) with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.
There is a master “NameNode” to keep track of overall file directory structure and the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed.
To read a file, the client API will calculate the chunk index based on the offset of the file pointer and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From this points, the client contacts the DataNode directly without going through the NameNode.
To write a file, client API will first contact the NameNode who will designate one of the replica as the primary (by granting it a lease). The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNode. After changes are buffered at all DataNodes, the client send a “commit” request to the primary, which determines an order to update and then push this order to all other secondaries. After all secondaries complete the commit, the primary will response to the client about the success.
All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintain an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file.
In case of the NameNode crash, all lease granting operation will fail and so any write operation is effectively fail also. Read operation should continuously to work as long as the clinet program has a handle to the DataNode. To recover from NameNode crash, a new NameNode can take over after restoring the state from the last checkpoint file and replay the operation log.
When a DataNode crashes, it will be detected by the NameNode after missing its hearbeat for a while. The NameNode removes the crashed DataNode from the cluster and spread its chunks to other surviving DataNodes. This way, the replication factor of each chunk will be maintained across the cluster.
Later when the DataNode recover and rejoin the cluster, it reports all its chunks to the NameNode at boot time. Each chunk has a version number which will advanced at each update. Therefore, the NameNode can easily figure out if any of the chunks of a DataNode becomes stale. Those stale chunks will be garbage collected at a later time.
7.4 Job Execution
Hadoop MapRed is based on a “pull” model where multiple “TaskTrackers” poll the “JobTracker” for tasks (either map task or reduce task).
The job execution starts when the client program uploading three files: “job.xml” (the job config including map, combine, reduce function and input/output data path, etc.), “job.split” (specifies how many splits and range based on dividing files into ~16 – 64 MB size), “job.jar” (the actual Mapper and Reducer implementation classes) to the HDFS location (specified by the “mapred.system.dir” property in the “hadoop-default.conf” file). Then the client program notifies the JobTracker about the Job submission. The JobTracker returns a Job id to the client program and starts allocating map tasks to the idle TaskTrackers when they poll for tasks.
Each TaskTracker has a defined number of "task slots" based on the capacity of the machine. There are heartbeat protocol allows the JobTracker to know how many free slots from each TaskTracker. The JobTracker will determine appropriate jobs for the TaskTrackers based on how busy thay are, their network proximity to the data sources (preferring same node, then same rack, then same network switch). The assigned TaskTrackers will fork a MapTask (separate JVM process) to execute the map phase processing. The MapTask extracts the input data from the splits by using the “RecordReader” and “InputFormat” and it invokes the user provided “map” function which emits a number of key/value pair in the memory buffer.
When the buffer is full, the output collector will spill the memory buffer into disk. For optimizing the network bandwidth, an optional “combine” function can be invoked to partially reduce values of each key. Afterwards, the “partition” function is invoked on each key to calculate its reducer node index. The memory buffer is eventually flushed into 2 files, the first index file contains an offset pointer of each partition. The second data file contains all records sorted by partition and then by key.
When the map task has finished executing all input records, it start the commit process, it first flush the in-memory buffer (even it is not full) to the index + data file pair. Then a merge sort for all index + data file pairs will be performed to create a single index + data file pair.
The index + data file pair will then be splitted into are R local directories, one for each partition. After all the MapTask completes (all splits are done), the TaskTracker will notify the JobTracker which keeps track of the overall progress of job. JobTracker also provide a web interface for viewing the job status.
When the JobTracker notices that some map tasks are completed, it will start allocating reduce tasks to subsequent polling TaskTrackers (there are R TaskTrackers will be allocated for reduce task). These allocated TaskTrackers remotely download the region files (according to the assigned reducer index) from the completed map phase nodes and concatenate (merge sort) them into a single file. Whenever more map tasks are completed afterwards, JobTracker will notify these allocated TaskTrackers to download more region files (merge with previous file). In this manner, downloading region files are interleaved with the map task progress. The reduce phase is not started at this moment yet.
Eventually all the map tasks are completed. The JobTracker then notifies all the allocated TaskTrackers to proceed to the reduce phase. Each allocated TaskTracker will fork a ReduceTask (separate JVM) to read the downloaded file (which is already sorted by key) and invoke the “reduce” function, which collects the key/aggregatedValue into the final output file (one per reducer node). Note that each reduce task (and map task as well) is single-threaded. And this thread will invoke the reduce(key, values) function in assending (or descending) order of the keys assigned to this reduce task. This provides an interesting property that all entries written by the reduce() function is sorted in increasing order. The output of each reducer is written to a temp output file in HDFS. When the reducer finishes processing all keys, the temp output file will be renamed atomically to its final output filename.
The Map/Reduce framework is resilient to crashes of any components. TaskTracker nodes periodically report their status to the JobTracker which keeps track of the overall job progress. If the JobTracker hasn’t heard from any TaskTracker nodes for a long time, it assumes the TaskTracker node has been crashed and will reassign its tasks appropriately to other TaskTracker nodes. Since the map phase result is stored in the local disk, which will not be available when the TaskTracker node crashes. In case a map-phase TaskTracker node crashes, the crashed MapTasks (regardless of whether it is complete or not) will be reassigned to a different TaskTracker node, which will rerun all the assigned splits. However, the reduce phase result is stored in HDFS, which is available even the TaskTracker node crashes. Therefore, in case a reduce-phase TaskTracker node crashes, only the incomplete ReduceTasks need to be reassigned to a different TaskTracker node, where the incompleted reduce tasks will be re-run.
The job submission process is asynchronous. Client program can poll for the job status at any time by supplying the job id.
Hadoop Configurations and Tuning
To achieve maximum results from Hadoop implementations, some key considerations for configuring the Hadoop environment itself, which are described in this section. Similar to the system hardware and software recommendations given above, these settings must be tailored to the needs of the individual implementation, and users are encouraged to experiment with their own systems and environment.Nevertheless, there are factors in common among Hadoop deployments that provide general guidance.
5.1 General Configurations
• The numbers of NameNode and JobTracker server threads that handle remote procedure calls (RPCs), specified by dfs.namenode.handler.count and mapred.job.tracker.handler.count, respectively, both default to 10 and should be set to a larger number (for example, 64) for large clusters.
• The number of DataNode server threads that handle RPCs, as specified by dfs.datanode.handler.count, defaults to three and should be set to a larger number (for example, eight) if there are a large number of HDFS clients. (Note: Every additional thread consumes more memory.)
• The number of work threads on the HTTP server that runs on each TaskTracker to handle the output of map tasks on that server, as specified by tasktracker.http.threads, should be set in the range of 40 to 50 for large clusters.
5.2 HDFS-Specific Configurations
• The replication factor for each block of an HDFS file, as specified by dfs.replication, is typically set to three for fault-tolerance; setting it to a smaller value is not recommended.
• The default HDFS block size, as specified by dfs.block.size, is 64 MB in HDFS, and it is usually desirable to use a larger block size (such as 128 MB) for large file systems.
5.3 Map/Reduce-Specific Configurations
• The maximum number of map/ reduce tasks that run simultaneously on a TaskTracker, as specified by mapred. tasktracker.{map/reduce}.tasks.maximum, should usually be set in the range of(cores_per_node)/2 to 2x(cores_per_ node), especially for large clusters.
• The number of input streams (files) to be merged at once in the map/reduce tasks, as specified by io.sort.factor, should be set to a sufficiently large value (for example, 100) to minimize disk accesses.
• The JVM settings should have the parameter java.net.preferIPv4Stack set to true, to avoid timeouts in cases where the OS/JVM picks up an IPv6 address and must resolve the hostname.
5.4 Map Task-Specific Configurations
• The total size of result and metadata buffers associated with a map task, as specified by io.sort.mb, defaults to 100 MB and can be set to a higher level, such as 200 MB.
• The percentage of total buffer size that is dedicated to the metadata buffer, as specified by io.sort.record.percent, which defaults to 0.05, should be adjusted according to the key-value pair size of the particular Hadoop job
5.1 General Configurations
• The numbers of NameNode and JobTracker server threads that handle remote procedure calls (RPCs), specified by dfs.namenode.handler.count and mapred.job.tracker.handler.count, respectively, both default to 10 and should be set to a larger number (for example, 64) for large clusters.
• The number of DataNode server threads that handle RPCs, as specified by dfs.datanode.handler.count, defaults to three and should be set to a larger number (for example, eight) if there are a large number of HDFS clients. (Note: Every additional thread consumes more memory.)
• The number of work threads on the HTTP server that runs on each TaskTracker to handle the output of map tasks on that server, as specified by tasktracker.http.threads, should be set in the range of 40 to 50 for large clusters.
5.2 HDFS-Specific Configurations
• The replication factor for each block of an HDFS file, as specified by dfs.replication, is typically set to three for fault-tolerance; setting it to a smaller value is not recommended.
• The default HDFS block size, as specified by dfs.block.size, is 64 MB in HDFS, and it is usually desirable to use a larger block size (such as 128 MB) for large file systems.
5.3 Map/Reduce-Specific Configurations
• The maximum number of map/ reduce tasks that run simultaneously on a TaskTracker, as specified by mapred. tasktracker.{map/reduce}.tasks.maximum, should usually be set in the range of(cores_per_node)/2 to 2x(cores_per_ node), especially for large clusters.
• The number of input streams (files) to be merged at once in the map/reduce tasks, as specified by io.sort.factor, should be set to a sufficiently large value (for example, 100) to minimize disk accesses.
• The JVM settings should have the parameter java.net.preferIPv4Stack set to true, to avoid timeouts in cases where the OS/JVM picks up an IPv6 address and must resolve the hostname.
5.4 Map Task-Specific Configurations
• The total size of result and metadata buffers associated with a map task, as specified by io.sort.mb, defaults to 100 MB and can be set to a higher level, such as 200 MB.
• The percentage of total buffer size that is dedicated to the metadata buffer, as specified by io.sort.record.percent, which defaults to 0.05, should be adjusted according to the key-value pair size of the particular Hadoop job
System Software Selection and Configuration- Hadoop
1.1 Selecting the Operating System and JVM
Using a Linux* distribution based on kernel version 2.6.30 or later is recommended when deploying Hadoop on current generation servers because of the optimizations included for energy and threading efficiency. For example, Intel has observed that energy consumption can be up to 60 percent (42 watts) higher at idle for each server using older versions of Linux.6 Such power inefficiency, multiplied over a large Hadoop cluster, could amount to significant additional energy costs. For better performance, the local file systems (for example, ext3 or xfs) are usually mounted with the no atime attribute. In addition, Sun Java* 6 is required to run Hadoop, and the latest version (Java 6u14 or later) is recommended to take advantage of optimizations such as compressed ordinary object pointers.
The default Linux open file descriptor limit is set to 1024, which is usually too low for Hadoop daemons. This setting should be increased to approximately 64,000 using the /etc/security/limits.conf file or alternate means. If the Linux kernel 2.6.28 is used, the default open epoll file descriptor limit is 128, which is too low for Hadoop and should be increased to approximately 4096 using the /etc/sysctl.conf file or alternate means.
1.2 Choosing Hadoop Versions and Distributions
When selecting a version of Hadoop for the implementation, organizations must seek a balance between the enhancements available from the most recent available release and the stability available from more mature versions.
For example, at the time of this writing, the most recent stable version of Hadoop is 0.18.3, while the latest release of Hadoop, version 0.20.0, contains important enhancements, including pluggable scheduling API, capacity scheduler, fair scheduler, and multiple task assignment. One other potential advantage of using Hadoop 0.20.0 is in the area of performance. Intel’s lab testing shows that some workloads within Hadoop can benefit from the multi-task assignment features in 0.20.0. Although the Map stage in v0.20.0 is slower and uses more memory than v0.19.1, the overall job runs at about the same speed or up to 8
percent faster in v0.20.0 in the case of Hadoop Sort
The primary source for securing the latest distribution is the Apache Software Foundation Web site (www.apache.org).For companies planning Hadoop installations, it may be worthwhile to
evaluate the Cloudera distribution, which includes RPM and Debian* packaging and tools for configuration. Intel has used Cloudera’s distribution on some of its lab systems for performance testing.
Server Hardware Configuration - Hadoop
One of the most important decisions in planning a Hadoop infrastructure deployment is the number, type, and configuration of the servers specified. As with other workloads, depending on the specific Hadoop application, computation may be bound by I/O, memory, or processor resources. Those requirements will require the system-level hardware to be adjusted on a case-bycase basis, but the general guidelines suggested in this section provide a point of departure for that fine-tuning.
1.1 Choosing a Server Platform
Typically, dual-socket servers are optimal for Hadoop deployments. Servers of this type are generally more efficient than large-scale multi-processor platforms for massively distributed implementations such as Hadoop, from a per-node, cost benefit perspective. Similarly, dual-socket servers more than offset the added per node hardware cost relative to entry-level servers through superior efficiencies in terms of load-balancing and parallelization overheads. Choosing hardware based on the most current platform technologies available helps to ensure the optimal intraserver throughput and energy efficiency.
1.2 Selecting and Configuring the Hard Drive
A relatively large number of hard drives per server (typically four to six) is recommended. While it is possible to use RAID 0 to concatenate smaller drives into a larger pool, using RAID on Hadoop servers is generally not recommended because Hadoop itself orchestrates data provisioning and redundancy across individual nodes. This approach provides good results across a wide spectrum of workloads because of the way that Hadoop interacts with storage.
The optimal balance between cost and performance is generally achieved with 7200 RPM SATA drives. This is likely to evolve quickly with the evolution of drive technologies, but it is a useful rule of thumb at the time of this writing. Hard drives should run in the AHCI (Advanced Host Controller Interface) mode with NCQ (Native Command Queuing) enabled, to improve performance when multiple simultaneous read/write requests are outstanding.
1.3 Memory Sizing
Sufficient memory capacity is critical for efficient operation of servers in a Hadoop cluster, supporting high throughput by allowing large numbers of map/reduce tasks to be carried out simultaneously. Typical Hadoop applications require approximately 1–2 GB of RAM per processor core, which corresponds to 8–16 GB for a dual-socket server using quad-core processors. When deploying servers based on the Intel® Xeon® processor 5500 series, it is recommended that DIMMs (dual in-line memory modules) be populated in multiples of six to balance across available memory channels (that is, system configurations of 12 GB, 24 GB, and so on). As a final consideration, ECC (error-correcting code) memory is highly recommended for Hadoop, to detect and correct errors introduced during storage and transmission of data.
1.4 Selecting a Motherboard
To maximize the energy efficiency and performance of a Hadoop cluster, it is important to select the server motherboard carefully. Hadoop deployments do not require many of the features typically found in an enterprise data center server, and the motherboard selected should use high efficiency voltage regulators and be optimized for airflow. Many vendors have designed systems based on Intel Xeon processors with those characteristics; they are typically marketed to cloud computing or Internet data center providers. One such product is the Intel® Server Board S5500WB (formerly code-named Willowbrook), which has been pecifically designed for high-density computing environments. Selecting a server with the right motherboard can have a positive financial impact to the bottom line compared to using other enterprise-focused systems that lack similar optimizations.
1.5 Specifying a Power Supply
As a key means of reducing overall cost of ownership, organizations should specify, as part of the design and planning process, their energy-efficiency requirements for server power supplies. Power supplies certified by the 80 PLUS* Program (www.80plus.org) at various levels, including bronze, silver, and gold (with gold being the most efficient), provide organizations with objective standards to use during the procurement process.
1.6 Choosing Processors
The processor plays an important role in determining the speed, throughput, and efficiency of Hadoop clusters. The Intel Xeon processor 5500 series provides excellent performance for highly distributed workloads such as those asociated with Hadoop applications. Lab testing was performed to establish the performance benefits of the Intel Xeon processor 5500 series relative to previous-generation Intel processors
HDFS and Map Reduce Architecture
Master server is controling all the activities in the cluster and slave/Data node works for master node.
Here in the hadoop environment , Master Node server is called Name node and Slave node is called Data node.
1. NameNode server
2. Data server
Namenode: Namenode is managing the file system metadata and also provides control service to the hadoop cluster. There will be only one namenode process running in a hadoop file system in the cluster environment.
Backupnode: Namenode is a single point of failure in a hadoop file system environment. So to overcome this failure, Backup node is used to copy the meta data file system from the namenode at frequent interval.
Datanode: Datanode is used for storing the data and retrieval of the data. There will be multiple processes are running in a cluster. Typically one datanode process per storage node.
Job Tracker: Job tracker accepts jobs and submissions of the jobs in a cluster environment. It also distributing/controlling the jobs in a cluster enviroment.
Distributed jobs are handover to TaskTracker process in a datanode.
TaskTracker: It manages the execution of the individual map and reduce task in the datanode
![](file:///C:/DOCUME%7E1/ADMINI%7E1.CLO/LOCALS%7E1/Temp/moz-screenshot.png)
Ten Ways To Improve the RDBMS with Hadoop
1. Accelerating nightly batch business processes. Many organizations have production transaction systems that require nightly processing and have narrow windows to perform their calculations and analysis before the start of the next day. Since Hadoop can scale linearly, this can enable internal or external on-demand cloud farms to dynamically handle shrink performance windows and take on larger volume situations that an RDBMS just can't easily deal with. This doesn't elide the import/export challenges depending on the application but can certainly compress the windows between them.
2. Storage of extremely high volumes of enterprise data. The Hadoop Distributed File System is a marvel in itself and can be used to hold extremely large data sets safely on commodity hardware long term that otherwise couldn't stored or handled easily in a relational database. I am specifically talking about volumes of data that today's RDBMS's would still have trouble with, such as dozens or hundreds of petabytes and which are common in genetics, physics, aerospace, counter intelligence and other scientific, medical, and government applications.
3. Creation of automatic, redundant backups. Hadoop can then keep the data that it processes, even after it it's been imported into other enterprise systems. HDFS creates a natural, reliable, and easy-to-use backup environment for almost any amount of data at reasonable prices considering that it's essentially a high-speed online data storage environment.
4. Improving the scalability of applications. Very low cost commodity hardware can be used to power Hadoop clusters since redundancy and fault resistance is built into the software instead of using expensive enterprise hardware or software alternatives with proprietary solutions. This makes adding more capacity (and therefore scale) easier to achieve and Hadoop is an affordable and very granular way to scale out instead of up. While there can be cost in converting existing applications to Hadoop, for new applications it should be a standard option in the software selection decision tree. Note: Hadoop's fault tolerance is acceptable, not best-of-breed, so check this against your application's requirements.
5. Use of Java for data processing instead of SQL. Hadoop is a Java platform and can be used by just about anyone fluent in the language (other language options are coming available soon via APIs.) While this won't help shops that have plenty of database developers, Hadoop can be a boon to organizations that have strong Java environments with good architecture, development, and testing skills. And while yes, it's possible to use languages such as Java and C++ to write stored procedures for an RDBMS, it's not a widespread activity.
6. Producing just-in-time feeds for dashboards and business intelligence. Hadoop excels at looking at enormous amounts of data and providing detailed analysis of business data that an RDBMS would often take too long or would be too expensive to carry out. Facebook, for example, uses Hadoop for daily and hourly summaries of its 150 million+ monthly visitors. The resulting information can be quickly transferred to BI, dashboards, or mashup platforms.
7. Handling urgent, ad hoc requests for data. While certainly expensive enterprise data warehousing software can do this, Hadoop is a strong performer when it comes to quickly asking and getting answers to urgent questions involving extremely large datasets.
8. Turning unstructured data into relational data. While ETL tools and bulk load applications work well with smaller datasets, few can approach the data volume and performance that Hadoop can, especially at a similar price/performance point. The ability to take mountains of inbound or existing business data, spread the work over a large distributed cloud, add structure, and import the result into an RDBMS makes Hadoop one of the most powerful database import tools around.
9. Taking on tasks that require massive parallelism. Hadoop has been known to scale out to thousands of nodes in production environments. Even better, It requires relatively little innate programing skill to achieve since parallelism is an intrinsic property of the platform. While you can do the same with SQL, it requires some skill and experience with the techniques. In other words, you have to know what you're doing. For organizations that are experiencing ceilings with their current RDBMS, you can look at Hadoop to help break through them.
10. Moving existing algorithms, code, frameworks, and components to a highly distributed computing environment. Done right -- and there are challenges depending on what your legacy code wants to do -- and Hadoop can be used as a way to migrate old, single core code into a highly distributed environment to provide efficient, parallel access to ultra-large datasets. Many organizations already have proven code that is tested and hardened and ready to use but is limited without an enabling framework. Hadoop adds the mature distributed computing layer than can transition these assets to a much larger and more powerful modern environment
2. Storage of extremely high volumes of enterprise data. The Hadoop Distributed File System is a marvel in itself and can be used to hold extremely large data sets safely on commodity hardware long term that otherwise couldn't stored or handled easily in a relational database. I am specifically talking about volumes of data that today's RDBMS's would still have trouble with, such as dozens or hundreds of petabytes and which are common in genetics, physics, aerospace, counter intelligence and other scientific, medical, and government applications.
3. Creation of automatic, redundant backups. Hadoop can then keep the data that it processes, even after it it's been imported into other enterprise systems. HDFS creates a natural, reliable, and easy-to-use backup environment for almost any amount of data at reasonable prices considering that it's essentially a high-speed online data storage environment.
4. Improving the scalability of applications. Very low cost commodity hardware can be used to power Hadoop clusters since redundancy and fault resistance is built into the software instead of using expensive enterprise hardware or software alternatives with proprietary solutions. This makes adding more capacity (and therefore scale) easier to achieve and Hadoop is an affordable and very granular way to scale out instead of up. While there can be cost in converting existing applications to Hadoop, for new applications it should be a standard option in the software selection decision tree. Note: Hadoop's fault tolerance is acceptable, not best-of-breed, so check this against your application's requirements.
5. Use of Java for data processing instead of SQL. Hadoop is a Java platform and can be used by just about anyone fluent in the language (other language options are coming available soon via APIs.) While this won't help shops that have plenty of database developers, Hadoop can be a boon to organizations that have strong Java environments with good architecture, development, and testing skills. And while yes, it's possible to use languages such as Java and C++ to write stored procedures for an RDBMS, it's not a widespread activity.
6. Producing just-in-time feeds for dashboards and business intelligence. Hadoop excels at looking at enormous amounts of data and providing detailed analysis of business data that an RDBMS would often take too long or would be too expensive to carry out. Facebook, for example, uses Hadoop for daily and hourly summaries of its 150 million+ monthly visitors. The resulting information can be quickly transferred to BI, dashboards, or mashup platforms.
7. Handling urgent, ad hoc requests for data. While certainly expensive enterprise data warehousing software can do this, Hadoop is a strong performer when it comes to quickly asking and getting answers to urgent questions involving extremely large datasets.
8. Turning unstructured data into relational data. While ETL tools and bulk load applications work well with smaller datasets, few can approach the data volume and performance that Hadoop can, especially at a similar price/performance point. The ability to take mountains of inbound or existing business data, spread the work over a large distributed cloud, add structure, and import the result into an RDBMS makes Hadoop one of the most powerful database import tools around.
9. Taking on tasks that require massive parallelism. Hadoop has been known to scale out to thousands of nodes in production environments. Even better, It requires relatively little innate programing skill to achieve since parallelism is an intrinsic property of the platform. While you can do the same with SQL, it requires some skill and experience with the techniques. In other words, you have to know what you're doing. For organizations that are experiencing ceilings with their current RDBMS, you can look at Hadoop to help break through them.
10. Moving existing algorithms, code, frameworks, and components to a highly distributed computing environment. Done right -- and there are challenges depending on what your legacy code wants to do -- and Hadoop can be used as a way to migrate old, single core code into a highly distributed environment to provide efficient, parallel access to ultra-large datasets. Many organizations already have proven code that is tested and hardened and ready to use but is limited without an enabling framework. Hadoop adds the mature distributed computing layer than can transition these assets to a much larger and more powerful modern environment
RDBMS and Hadoop
Here is a comparison of the overall differences between the RDBMS and MapReduce-based systems such as Hadoop:
/*
=Start table structure
*/
#Summary, #Caption {
width: 28em;
}
.tableStyle {
width: 80%;
margin: 1em 0 1em 5%;
border: solid #666;
border-width: 1px 0 0 1px;
border-collapse: collapse;
}
.tableStyle th, .tableStyle td {
border: solid 1px #666;
border-width: 0 1px 1px 0;
padding: 0.2em;
}
/*
=End table structure
*/
/*
=Start grey colour scheme
*/
.greyScheme, .greyScheme th, .greyScheme td {
border-color: #666;
}
.greyScheme .even {
background-color: #E3F6FE;
}
.greyScheme th, .greyScheme thead td {
background-color: #B1B1B1;
}
.greyScheme th.firstColumn {
background-color: #D1D1D1;
}
/*
=End grey colour scheme
*/
________________________________________
RDBMS MapReduce
Data size Gigabytes Petabytes
Access Interactive and batch Batch
Structure Fixed schema Unstructured schema
Language SQL Procedural (Java, C++, Ruby, etc)
Integrity High Low
Scaling Nonlinear Linear
Updates Read and write Write once, read many times
Latency Low High
From this it's clear that the MapReduce model cannot replace the traditional enterprise RDBMS. However, it can be a key enabler of a number of interesting scenarios that can considerably increase flexibility, turn-around times, and the ability to tackle problems that weren't possible before.
With the latter the key is that SQL-based processing of data tends not to scale linearly after a certain ceiling, usually just a handful of nodes in a cluster. With MapReduce, you can consistently get performance gains by increasing the size of the cluster. In other words, double the size of Hadoop cluster and a job will run twice as fast, triple it and the same thing, etc.
/*
=Start table structure
*/
#Summary, #Caption {
width: 28em;
}
.tableStyle {
width: 80%;
margin: 1em 0 1em 5%;
border: solid #666;
border-width: 1px 0 0 1px;
border-collapse: collapse;
}
.tableStyle th, .tableStyle td {
border: solid 1px #666;
border-width: 0 1px 1px 0;
padding: 0.2em;
}
/*
=End table structure
*/
/*
=Start grey colour scheme
*/
.greyScheme, .greyScheme th, .greyScheme td {
border-color: #666;
}
.greyScheme .even {
background-color: #E3F6FE;
}
.greyScheme th, .greyScheme thead td {
background-color: #B1B1B1;
}
.greyScheme th.firstColumn {
background-color: #D1D1D1;
}
/*
=End grey colour scheme
*/
________________________________________
RDBMS MapReduce
Data size Gigabytes Petabytes
Access Interactive and batch Batch
Structure Fixed schema Unstructured schema
Language SQL Procedural (Java, C++, Ruby, etc)
Integrity High Low
Scaling Nonlinear Linear
Updates Read and write Write once, read many times
Latency Low High
From this it's clear that the MapReduce model cannot replace the traditional enterprise RDBMS. However, it can be a key enabler of a number of interesting scenarios that can considerably increase flexibility, turn-around times, and the ability to tackle problems that weren't possible before.
With the latter the key is that SQL-based processing of data tends not to scale linearly after a certain ceiling, usually just a handful of nodes in a cluster. With MapReduce, you can consistently get performance gains by increasing the size of the cluster. In other words, double the size of Hadoop cluster and a job will run twice as fast, triple it and the same thing, etc.
Parameters consideration in Map Reduce
1. Speed. The seek times of physical storage is not keeping pace with improvements in network speeds.
2. Scale. The difficulty of scaling the RDBMS out efficiently (i.e. clustering beyond a handful of servers is notoriously hard.)
3. Integration. Today's data processing tasks increasingly have to access and combine data from many different non-relational sources, often over a network.
4. Volume. Data volumes have grown from tens of gigabytes in the 1990s to hundreds of terabytes and often petabytes in recent years.
2. Scale. The difficulty of scaling the RDBMS out efficiently (i.e. clustering beyond a handful of servers is notoriously hard.)
3. Integration. Today's data processing tasks increasingly have to access and combine data from many different non-relational sources, often over a network.
4. Volume. Data volumes have grown from tens of gigabytes in the 1990s to hundreds of terabytes and often petabytes in recent years.
Java-based MapReduce Implementation
The Google environment is customized for their needs and to fit their operational model. For example, Google uses a proprietary file system for storing files that’s optimized for the type of operations that their MapReduce implementations are likely to perform. Enterprise applications, on the other hand, are built on top of Java or similar technologies, and rely on existing file systems, communication protocols, and application stacks.
A Java-based implementation of MapReduce should take into account existing data storage facilities, which protocols are supported by the organization where it will be deployed, the internal APIs at hand, and the availability of third-party products (open-source or commercial) that will support deployment. Figure 2 shows how the general architecture could be mapped to robust, existing Java open-source infrastructure for this implementation.
The Terracotta clustering technology is a great choice for sharing data between the map() and reduce() tasks because it abstracts all the communications overhead involved in sharing files or using RPC calls to initiate processing of the results. The map() and reduce() tasks are implemented in the same application core, as described in the previous section. The data structures for sharing intermediate result sets could be kept in memory data structures that are in turn shared transparently by Terracotta. Interprocess communication issues disappear to the MapReduce implementers since the Terracotta runtime is in charge of sharing those data structures across the cluster with MapReduce that uses its runtime. Instead of implementing a complex signaling system, all the map() tasks need to do is flag intermediate result sets in memory and the reduce() tasks will fetch them directly.
Both the controller and the main application are likely to be in a wait state for a while even with the massive parallelization available through the MapReduce cluster. Signaling between these two components, and between the reduce() tasks and the controller when reduction is complete, is done over the Mule ESB. In this manner, results output could be queued up for processing by other applications, or a Mule service object (or UMO) can take these output results and split them into buckets for another MapReduce pass, as described in the previous section. Mule supports synchronous and asynchronous data transfers in memory, across all major enterprise protocols, or over raw TCP/IP sockets. Mule can be used to move results output between applications executing in the same machine, across the data center, or in a different location entirely with little programmer participation beyond identifying a local endpoint and letting Mule move and transform the data toward its destination.
Another Java-based implementation could be through Hadoop, a Lucene-derived framework for deploying distributed applications running on large clusters of commodity computers. Hadoop is an open-source, end-to-end, general-purpose implementation of MapReduce.
High Level Architecture for Map/Reduce
The central idea behind Map/Reduce is distributed processing. You have a cluster of machines that host a shared file system where the data is stored, and allow for distributed job management.
Processes run across the cluster are called jobs. These jobs have two phases: a map phase where the data is collected, and a reduce phase where the data is further processed to create a final result set.
Letʼs take the example of finding a soul mate on a dating site. All the profiles for millions of users are stored in the shared file system, which is spread across a cluster of machines.
We submit a job to the cluster with the profile pattern that we’re seeking. In the map phase, the cluster finds all the profiles that match our requirements. In the reduce phase these profiles are sorted to find the top matches, among which only the top ten are returned.
Hadoop is a Map/Reduce framework that’s broken into two large pieces. The first is the Hadoop File System (HDFS). This is a distributed file system written in Java that works much like a standard Unix file system. On top of that is the Hadoop job execution system. This system coordinates the jobs, prioritizes and monitors them, and provides a framework that you can use to develop your own types of jobs. It even has a handy web page where you can monitor the progress of your jobs.
From a high level the Hadoop cluster system looks like Figure 1, “The Hadoop cluster architecture”.\
Processes run across the cluster are called jobs. These jobs have two phases: a map phase where the data is collected, and a reduce phase where the data is further processed to create a final result set.
Letʼs take the example of finding a soul mate on a dating site. All the profiles for millions of users are stored in the shared file system, which is spread across a cluster of machines.
We submit a job to the cluster with the profile pattern that we’re seeking. In the map phase, the cluster finds all the profiles that match our requirements. In the reduce phase these profiles are sorted to find the top matches, among which only the top ten are returned.
Hadoop is a Map/Reduce framework that’s broken into two large pieces. The first is the Hadoop File System (HDFS). This is a distributed file system written in Java that works much like a standard Unix file system. On top of that is the Hadoop job execution system. This system coordinates the jobs, prioritizes and monitors them, and provides a framework that you can use to develop your own types of jobs. It even has a handy web page where you can monitor the progress of your jobs.
From a high level the Hadoop cluster system looks like Figure 1, “The Hadoop cluster architecture”.\
Subscribe to:
Posts (Atom)