Hadoop uses the MapReduce programming model for the data processing of input and output for the map and to reduce functions represented as key-value pairs. They are subject to parallel execution of datasets situated in a wide array of machines in a distributed architecture. The programming paradigm is essentially functional in nature in combining while using the technique of map and reduce. This article introduces the MapReduce model, and in particular, how data in various formats, from simple text to structured binary objects are used.
MapReduce Types
Mapping is the core technique of processing a list of data elements that come in pairs of keys and values. The map function applies to individual elements defined as key-value pairs of a list and produces a new list. The general idea of map and reduce function of Hadoop can be illustrated as follows:
map: (K1, V1) -> list (K2, V2) reduce: (K2, list(V2)) -> list (K3, V3)
The input parameters of the key and value pair, represented by K1 and V1 respectively, are different from the output pair type: K2 and V2. The reduce function accepts the same format output by the map, but the type of output again of the reduce operation is different: K3 and V3. The Java API for this is as follows:
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable { void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException; } public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable { void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter)throws IOException; }
The OutputCollector is the generalized interface of the Map-Reduce framework to facilitate collection of data output either by the Mapper or the Reducer. These outputs are nothing but intermediate output of the job. Therefore, they must be parameterized with their types. The Reporter facilitates the Map-Reduce application to report progress and update counters and status information. If, however, the combine function is used, it has the same form as the reduce function and the output is fed to the reduce function. This may be illustrated as follows:
map: (K1, V1) -> list (K2, V2) combine: (K2, list(V2)) -> list (K2, V2) reduce: (K2, list(V2)) -> list (K3, V3)
Note that the combine and reduce functions use the same type, except in the variable names where K3 is K2 and V3 is V2.
The partition function operates on the intermediate key-value types. It controls the partitioning of the keys of the intermediate map outputs. The key derives the partition using a typical hash function. The total number of partitions is the same as the number of reduce tasks for the job. The partition is determined only by the key ignoring the value.
public interface Partitioner<K2, V2> extends JobConfigurable { int getPartition(K2 key, V2 value, int numberOfPartition); }
This is the key essence of MapReduce types in short.
Input Formats
Hadoop has to accept and process a variety of formats, from text files to databases. A chunk of input, called input split, is processed by a single map. Each split is further divided into logical records given to the map to process in key-value pair. In the context of database, the split means reading a range of tuples from an SQL table, as done by the DBInputFormat and producing LongWritables containing record numbers as keys and DBWritables as values. The Java API for input splits is as follows:
public interface InputSplit extends Writable { long getLength() throws IOException; String[] getLocations() throws IOException; }
The InputSplit represents the data to be processed by a Mapper. It returns the length in bytes and has a reference to the input data. It presents a byte-oriented view on the input and is the responsibility of the RecordReader of the job to process this and present a record-oriented view. In most cases, we do not deal with InputSplit directly because they are created by an InputFormat. It is is the responsibility of the InputFormat to create the input splits and divide them into records.
public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, throws IOException; }
The JobClient invokes the getSplits() method with appropriate number of split arguments. The number given is a hint as the actual number of splits may be different from the given number. Once the split is calculated it is sent to the jobtracker. The jobtracker schedules map tasks for the tasktrackers using storage location. The tasktracker then passes the split by invoking getRecordReader() method on the InputFormat to get RecordReader for the split.
The FileInputFormat is the base class for the file data source. It has the responsibility to identify the files that are to be included as the job input and the definition for generating the split.
Hadoop also includes processing of unstructured data that often comes in textual format. The TextInputFormat is the default InputFormat for such data.
The SequenceInputFormat takes up binary inputs and stores sequences of binary key-value pairs.
Similarly, DBInputFormat provides the capability to read data from relational database using JDBC.
Output Formats
The output format classes are similar to their corresponding input format classes and work in the reverse direction.
For example, the TextOutputFormat is the default output format that writes records as plain text files, whereas key-values any be of any types, and transforms them into a string by invoking the toString() method. The key-value character is separated by the tab character, although this can be customized by manipulating the separator property of the text output format.
For binary output, there is SequenceFileOutputFormat to write a sequence of binary output to a file. Binary outputs are particularly useful if the output becomes input to a further MapReduce job.
The output formats for relational databases and to HBase are handled by DBOutputFormat. It sends the reduced output to a SQL table. For example, the HBase’s TableOutputFormat enables the MapReduce program to work on the data stored in the HBase table and uses it for writing outputs to the HBase table.
Conclusion
This is, in short, the crux of MapReduce types and formats. Refer to the listing in the reference below to get more details on them. There are many intricate details on the functions of the Java APIs that become clearer only when one dives into programming. Refer to the Apache Hadoop Java API docs for more details and start coding some practices.
References
- Tom White, Hadoop The Definitive Guide, O’Reilly
- Apache Hadoop Java API Docs