Recently I gave a talk about the Hadoop ecosystem at Igalia’s Master Open Sessions. The presentation is available here: “The Hadoop Stack: An overview of the Hadoop ecosystem”. Together with the presentation I promised the crowd there to post some introductory articles about Hadoop and HBase. How to install them, use them, create a basic Hadoop project, etc.

So in this post I will explain how to create a basic project with access to the Hadoop API. The project will be managed with Maven, and it will be able to execute Hadoop jobs from Eclipse.

Installing Maven

Maven is project management tool, similar to Ant or Make, that allow us to build a project, add dependencies and control many different aspects of the project. Maven3 is the latest version.

$ sudo apt-get install maven3

It is possible to have both Maven3 and Maven2 in the same system. In case you already had Maven2 installed, installing Maven3 will make the mvn command to point to Maven3. The command mvn2 will execute Maven2.

$ mvn --version
/usr/java/default  
Apache Maven 3.0.3 (rNON-CANONICAL\_2011-10-11\_11-57_mockbuild; 2011-10-11 13:57:02+0200)  
Maven home: /usr/share/maven  
Java version: 1.6.0_25, vendor: Sun Microsystems Inc.  
Java home: /usr/java/jdk1.6.0_25/jre  
Default locale: en_US, platform encoding: UTF-8  
OS name: "linux", version: "3.1.9-1.fc16.i686", arch: "i386", family: "unix"

Now install Eclipse. If you don’t have it in your system, go to http://www.eclipse.org/downloads/, download Eclipse classicand install it.

Installing the Maven plugin for Eclipse

Together with Eclipse we will install the Maven plugin for Eclipse. This plugin makes working with Maven from Eclipse easier. The plugin is called M2E. To install it follow these steps:

  • Help->Install new software…
  • Click on Add and set the following values and click Ok:
  • Name: m2e
  • Location: http://download.eclipse.org/technology/m2e/releases/
  • Click Next to install plugin

Creating a Maven project

Now we will create a basic skeleton of a project:

  • File->New->Project…
  • Select->Maven->Maven project. Next.
  • Click Next again.
  • Archetype: org.apache.maven.archetypes, maven-archetype-quickstart (selected option by default)
  • Set group, artifact and version for project:
  • Group: com.igalia
  • Artifact: wordcount
  • Version: 0.0.1-snapshot
  • Click Finnish.

To run the project from Eclipse:

  • Right-click on project
  • Run as->Java application

or simply type ‘Shift + Alt + X’ and then ‘J’.

To run the project from the command line, it is necessary first to add the Exec Maven Plugin. All Maven projects have a pom.xml file in their root folder. This file contains all the descriptions need to manage a software project (configurations, plugins, dependencies, etc). Open it and add the following:

<build>
    <plugins>
        <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
                <execution>
                    <goals>
                        <goal>java</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <mainClass>com.igalia.wordcount.App</mainClass>
            </configuration>
        </plugin>
    </plugins>
</build>

It is also a good idea to add a section specifying compiler configuration. This way we can tell Maven which version of Java to use to compile our project. Add the following under :

<!-- Compiler configuration -->
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
        <verbose>true</verbose>
        <source>1.6</source>
        <target>1.6</target>
        <encoding>UTF-8</encoding>
    </configuration>
</plugin>

Now to run the project from command line:

mvn exec:java -Dexec.mainClass="com.igalia.wordcount.App"

The output will show:

Hello World!

Adding Hadoop as a dependency

The development of Hadoop is very active. Currently, there are several branches going on. The latest one is 0.24. At the same same time the 0.20, 0.22 and 0.23 branches are also being developed. The current 1.0 version is fork of 0.20.205. We will install the latest version of the 1.0 branch.

<dependencies>
    <!-- Hadoop -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.0.2</version>
    </dependency>
</dependencies>

Now compile the project:

mvn clean install

Check that the variable M2_REPO is added to the list of accessible libraries:

  • Right-click on Project, select Properties (Alt + Enter)
  • Click on Java Build Path option.
  • Click on the Libraries tab.
  • Click on the Add Variable button.
  • Select variable M2_REPO and click OK.
  • Click OK again to close the window.

Now we should get access to the Hadoop API. Let’s add the following statement in the main method to check if Eclipse can resolve the import.

public static void main( String[] args ) {
    IntWritable one = new IntWritable(1);

    System.out.println("This is a one:" + one);
}

Run it:

mvn exec:java -Dexec.mainClass="com.igalia.wordcount.App"

Output:

This is a one:1

NOTE: Clicking on Ctrl + Shift + O will make Eclipse to include the import statement automatically. If Eclipse still cannot import the library, try to refresh Maven dependencies. Right-click on project Maven->Update dependencies.

Introduction to MapReduce

Remember that Hadoop is two things:

  • A distributed file system (HFS)
  • An implementation of MapReduce.

MapReduce is a programming model designed for processing large volumes of data in parallel by dividing the work into a set of independent tasks using a large number of computers (nodes). In consists of two steps:

  • Mapping step. The master node takes the input, partitions it up into smaller sub-problems, and distributes them to worker nodes. The worker node processes the smaller problem, and passes the answer back to its master node.
  • Reduce step. The master node then collects the answers to all the sub-problems and combines them in some way to form the output.

MapReduce was introduced by Google in 2004 to support distributed computing on large data sets on clusters of computers.

The WordCount example

The WordCount example is the Hello World of MapReduce and Hadoop. It consists of a MapReduce job that counts the number of words in a set of files. The Map function maps the dataset to many nodes. Each node counts the words in their smaller dataset. After that the result is combined together by the Reduce function.

Firstly, we will add a new class called WordCount.

public class WordCount extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        return 0;
    }

}

And in the main class, App, create a new configuration with the MapReduce job.

public static void main( String[] args ) throws Exception {

    int res = ToolRunner.run(new Configuration(), new WordCount(), args);
    System.exit(res);
}

Now, we should implement the MapReduce job inside the class WordCount. In pseudo-code it would be like this:

mapper (filename, file-contents):
   for each word in file-contents:
      emit (word, 1)

reducer (word, values):
   sum = 0
   for each value in values:
      sum = sum + value
      emit (word, sum)

So now we proceed to implement the MapClass class.

public static class MapClass extends MapReduceBase
    implements Mapper {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value,
            OutputCollector output,
            Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line);
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            output.collect(word, one);
        }
    }
}

The mapper receives a pair (key, value) and will emit another pair (key, value). The input pair is formed by two values: a Long and a Text. The Long is the offset of the file that contains the Text; the Text is a piece of text from a file. The output pair will be formed by a Text and an Int. The Text (the key) will be a word and the Int (the value) a 1 unit. So what we are doing is emitting a 1 for each word from the text received as input. Later, all these output pairs will be grouped, sorted and shuffle together in a process called Sort & Shuffle. The reducers will receive an input pair formed by a key and a list of values.

public static class Reduce extends MapReduceBase
    implements Reducer {

    public void reduce(Text key, Iterator values,
            OutputCollector output,
            Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}

The reducer takes all the words and how many ones were emitted for each one, sums them up and returns another pair (key, value) where the key is a word and the value is the number of times that word was repeated in the text.

Lastly, we need to implement the run method. Generally, here is where the MapReduce job is setup.

public int run(String[] args) throws Exception {
    Job job = Job.getInstance(getConf());
    job.setJarByClass(WordCount.class);
    job.setJobName("product");

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(MapClass.class);
    job.setCombinerClass(ReducerClass.class);
    job.setReducerClass(ReducerClass.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.setInputPaths(job, new Path("/tmp/wordcount/in"));
    FileOutputFormat.setOutputPath(job, new Path("/tmp/wordcount/out"));

    boolean success = job.waitForCompletion(true);
    return success ? 1 : 0;
}

Places the files to count the words from at /tmp/wordcount/in and create an empty directory /tmp/wordcount/out to place the results. After running the jobs there should be a part-0000 file at /tmp/wordcount/out with the results.

And this is all for the moment. In the next post I will give a brief introduction to HBase, how to install it, basic commands and how interface HBase with Hadoop.

Lastly, you can check out all the code explained in this post here: Hadoop-Word-Count