County Housing Search

From Mar 27, 2012

I recently wrote a job to identify the counties with the most houses for sale (according to the 2010 Census). To do so, I ingested the data from the Census Bureau, and wrote a MapReduce job. My goal was to have all counties and the total houses for sale in ascending order. Then, I could tail the results and see the top 10.

My Mapping Class

package ben.kn.sandbox.census.households;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import ben.kn.sandbox.data.CountyDemographicIndex.CountyGeneralEnum;
import ben.kn.sandbox.data.CountyDemographicIndex.CountyHouseholdEnum;

/**
 * This mapper pulls the total number of homes available for sale in each county
 * (k2) and the county name (v2).
 *
 * @author Ben
 */
@SuppressWarnings("deprecation")
public class HouseholdAvailabilityMapper extends MapReduceBase implements
                Mapper<LongWritable, Text, IntWritable, Text> {

        @Override
        public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output,
                        Reporter reporter) throws IOException {
                String[] values = value.toString().split(",");

                // get our desired values.
                String countyName = values[CountyGeneralEnum.NAME.index];

                if (countyName != null && countyName.trim().length() > 0) {
                        String housesForSaleStr = values[CountyHouseholdEnum.HOUSEHOLD_TOTAL_VACANT_UNITS_FOR_SALE.index
                                        + 1];

                        reporter.incrCounter("Job Counters", "CountiesReviewed", 1);

                        Integer totalHousesForSale = 0;

                        try {
                                // be sure a value exists before attempting to process
                                if (housesForSaleStr != null && housesForSaleStr.trim().length() > 0) {
                                        totalHousesForSale = Integer.parseInt(housesForSaleStr.trim());
                                }
                        } catch (NumberFormatException nfe) {
                                // do nothing, just continue processing.
                        }

                        // only output ones that have houses for sale. No sense including
                        // them if they haven't any!
                        if (totalHousesForSale > 0) {
                                reporter.incrCounter("Job Counters", "CountiesAdded", 1);
                                output.collect(new IntWritable(totalHousesForSale), new Text(countyName.trim()));
                        }
                }
        }
}

Notes:

I created a series of enums to parse through the data. It’s stored in a flat file, so I needed an easy way to directly access the fields I wanted. This also makes writing future jobs against the data simpler, because I can quickly access the data using the enums.

Another option would be to write a unique parser with a lot of methods. I would request a piece of data and give it the line, and it would return either null or the value. For example – CountyDataParser.getHouseholdsForSale(value.toString());

You  may also notice that I’m keeping a counter for the total jobs counties counted and included. These appear on the Jobtracker page, and are useful for debugging.

My Driver Class

package ben.kn.sandbox.census.households;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Run the HouseholdAvailability MapReduce job.
 *
 * @author Ben
 */
@SuppressWarnings("deprecation")
public class HouseholdAvailabilityJob extends Configured implements Tool {

        @Override
        public int run(String[] args) throws Exception {

                // Instantiate the JobConf
                JobConf conf = new JobConf(this.getConf(), HouseholdAvailabilityJob.class);
                conf.setJobName(this.getClass().getName());

                FileInputFormat.setInputPaths(conf, new Path(args[0]));
                FileOutputFormat.setOutputPath(conf, new Path(args[1]));

                // Set the Mapper and Reducer classes
                conf.setMapperClass(HouseholdAvailabilityMapper.class);
                conf.setReducerClass(IdentityReducer.class);

                // Set the Key and Value classes which the Mapper will output
                conf.setMapOutputKeyClass(IntWritable.class);
                conf.setMapOutputValueClass(Text.class);

                // Set the Key and Value classes which the Reducer will output
                conf.setOutputKeyClass(IntWritable.class);
                conf.setOutputValueClass(Text.class);

                conf.setNumReduceTasks(1);

                // Start the job
                JobClient.runJob(conf);
                return 0;
        }

        public static void main(String[] args) throws Exception {
                int exitCode = ToolRunner.run(new HouseholdAvailabilityJob(), args);
                System.exit(exitCode);
        }
}

Notes:

You may notice I’m using a stock class, the IdentityReducer. It will directly write the output of the Mapper to the file. I’m also limiting the Reducers to one (conf.setNumReduceTasks(1)), so that the sorted results are in order. If I wanted the results to be in descending order, I could sort them with a Partitioner.