From Mar 27, 2012
I recently wrote a job to identify the counties with the most houses for sale (according to the 2010 Census). To do so, I ingested the data from the Census Bureau, and wrote a MapReduce job. My goal was to have all counties and the total houses for sale in ascending order. Then, I could tail the results and see the top 10.
My Mapping Class
package ben.kn.sandbox.census.households; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import ben.kn.sandbox.data.CountyDemographicIndex.CountyGeneralEnum; import ben.kn.sandbox.data.CountyDemographicIndex.CountyHouseholdEnum; /** * This mapper pulls the total number of homes available for sale in each county * (k2) and the county name (v2). * * @author Ben */ @SuppressWarnings("deprecation") public class HouseholdAvailabilityMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> { @Override public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { String[] values = value.toString().split(","); // get our desired values. String countyName = values[CountyGeneralEnum.NAME.index]; if (countyName != null && countyName.trim().length() > 0) { String housesForSaleStr = values[CountyHouseholdEnum.HOUSEHOLD_TOTAL_VACANT_UNITS_FOR_SALE.index + 1]; reporter.incrCounter("Job Counters", "CountiesReviewed", 1); Integer totalHousesForSale = 0; try { // be sure a value exists before attempting to process if (housesForSaleStr != null && housesForSaleStr.trim().length() > 0) { totalHousesForSale = Integer.parseInt(housesForSaleStr.trim()); } } catch (NumberFormatException nfe) { // do nothing, just continue processing. } // only output ones that have houses for sale. No sense including // them if they haven't any! if (totalHousesForSale > 0) { reporter.incrCounter("Job Counters", "CountiesAdded", 1); output.collect(new IntWritable(totalHousesForSale), new Text(countyName.trim())); } } } }
Notes:
I created a series of enums to parse through the data. It’s stored in a flat file, so I needed an easy way to directly access the fields I wanted. This also makes writing future jobs against the data simpler, because I can quickly access the data using the enums.
Another option would be to write a unique parser with a lot of methods. I would request a piece of data and give it the line, and it would return either null or the value. For example – CountyDataParser.getHouseholdsForSale(value.toString());
You may also notice that I’m keeping a counter for the total jobs counties counted and included. These appear on the Jobtracker page, and are useful for debugging.
My Driver Class
package ben.kn.sandbox.census.households; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Run the HouseholdAvailability MapReduce job. * * @author Ben */ @SuppressWarnings("deprecation") public class HouseholdAvailabilityJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // Instantiate the JobConf JobConf conf = new JobConf(this.getConf(), HouseholdAvailabilityJob.class); conf.setJobName(this.getClass().getName()); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); // Set the Mapper and Reducer classes conf.setMapperClass(HouseholdAvailabilityMapper.class); conf.setReducerClass(IdentityReducer.class); // Set the Key and Value classes which the Mapper will output conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(Text.class); // Set the Key and Value classes which the Reducer will output conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(Text.class); conf.setNumReduceTasks(1); // Start the job JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new HouseholdAvailabilityJob(), args); System.exit(exitCode); } }
Notes:
You may notice I’m using a stock class, the IdentityReducer. It will directly write the output of the Mapper to the file. I’m also limiting the Reducers to one (conf.setNumReduceTasks(1)), so that the sorted results are in order. If I wanted the results to be in descending order, I could sort them with a Partitioner.