Continuing with the series about Hadoop, this post covers how to connect HBase and Hadoop together. This makes possible for instance to feed a MapReduce job from a HBase database or to write MapReduce results to a HBase table.
Taking the Hadoop Word Count example as starting point, I’m going to change it so it writes its output to a HBase table instead of to the filesystem.
First thing is to add HBase dependencies in the pom.xml.
<!-- HBase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>0.90.5</version> </dependency>
In my case I’m using HBase 0.90.5 which is compatible with my current version of Hadoop (V1.0.2). Before trying a higher version of Hadoop or HBase, check both versions are compatible with one each other. Upgrading current versions of both tools may imply modifications in the example code below.
I also need to create a table where to write the results.
$ hbase shell $ create 'words', 'number'
The table words will contain a key with a word, which is unique, and number of repetitions for each word.
Writing results to a HBase database basically implies to change the Reducer task. Other changes are needed too, like changing the preparation of the job.
public static class Reduce extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } Put put = new Put(toBytes(key.toString())); put.add(toBytes("number"), toBytes(""), toBytes(sum)); context.write(null, put); } }
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("/tmp/wordcount/in")); TableMapReduceUtil.initTableReducerJob( OUTPUT_TABLE, WordCount.Reduce.class, job);
Code is available at Hadoop-Word-Count under the remote branch word-count-hbase-write.
Likewise, it’s possible to read the contents for a MapReduce job from a HBase table and write the results to the filesystem or to a HBase table. Continuing with this example, I’m going to modify the Mapper class so it reads its contents from a HTable table.
First thing is to bulk load some files into a HBase table. To ease this step I created a basic tool called HBaseLoader. To run it:
$ mvn exec:java -Dexec.mainClass=com.igalia.hbaseloader.HBaseLoader -Dtablename=files -Ddir=dir
And this how the Mapper changes:
public static class MapClass extends TableMapper<Text, IntWritable> { @Override protected void map(ImmutableBytesWritable key, Result row, Context context) throws IOException, InterruptedException { // Do stuff } }
TableMapReduceUtil.initTableMapperJob( INPUT_TABLE, scan, WordCount.MapClass.class, Text.class, IntWritable.class, job);
You can check this implementation at https://github.com/dpino/Hadoop-Word-Count.git under the word-count-hbase-read-write branch.