MapReduce for Big Data

In the previous post of my Big Data series we started with a bit of the technical concepts of Hadoop, briefly covered its computing model and explored how it can be used as a plain file storage. This is from a technical point interesting, but storage alone is only half the fun.

So in this follow-up we are going to dive into the world of MapReduce, the unit-test framework MRUnit and a really simple todo example to demonstrate the advantages of the whole idea.

While not strictly required to understand why Hadoop is an excellent choice for storage, it really helps to have a bit of understanding for the course of this article, so if you haven’t read the previous post yet, we will wait - promised:

Ready? So without further ado let us start.

What is MapReduce?

One of the major problems with processing large quantities of data is that the processing of it doesn’t necessarily happen where the data actually is located. This is normally not a problem, but scales really badly and easily reaches physically limits of the underlying hardware e.g. in networking and causes further headaches on any kind of failure.

To circumvent this, the programming paradigm of MapReduce breaks the processing task into two three basic phases:

  • In the mapping phase small Java programs called mappers ingest and convert data according to internal logic into key-value pairs.

  • This data is collected in the shuffling phase, sorted and similar data piped to a single matching reducers if possible.

  • And in the final reducing phase the reducers pick the sorted data up, aggregate and further refine it and ultimately write it to a store like HDFS.

This can be simplified in following flow:

1 One mapper per file converts our JSON into a single key-value pair.
2 The shuffle step ensures that similar pairs are fed into the same reducer.
3 Finally the reducer compacts and combines the key-pairs.

Mapper

In the first step a mapper consumes the data supplied by the various RecordReader implementations of the framework. There exists a broad range of different implementations of the well-known formats like Parquet, Avro and even flat data, so the processing here is quite agnostic of it.

The interesting method here is map, which can be supplemented with optional setup and cleanup steps to prepare and respectively tidy up the stage.

public class TodoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private Text dueDate = new Text();
    private ObjectMapper mapper = new ObjectMapper();

    protected void map(LongWritable key, Text value, Context context)
            throws java.io.IOException, InterruptedException
    {
        try {
            Todo todo = this.mapper.readValue(value.toString(), Todo.class); (1)

            String formattedDate = todo.getDueDate().getDue()
                    .format(DateTimeFormatter.ofPattern(DueDate.DATE_PATTERN));

            context.write(new Text(formattedDate), new IntWritable(todo.getId())); (2)
        } catch (JsonProcessingException e) {
            /* Do nothing */
        }
    }
}
1 Parse the JSON-string and convert it to a Todo object.
2 After that write the parsed date along with the id back to the context.

Partitioner and Combiner

During the shuffling phase partitioners and combiners implement the logic how the data is distributed between the reducers.

If the partitioner is omitted, the framework just takes the hash of the keys, divides it the number of possible reducers and splits it accordingly. This normally guarantees equal distribution of data, but if there is any requirement to keep specific data together a custom partitioner must be supplied.

A simple example might look like this:

public class TodoPartitioner extends Partitioner<Text, Text> {

    protected int getPartition(Text key, Text value, int numReduceTasks) {
        int id = Integer.parseInt(value);

        if (20 > id) {
            return 1 % numReduceTasks;
        } else {
            return 2 % numReduceTasks;
        }
    }
}

Combiners are quite similar to reducers with the sole difference they are applied directly after the mapping step and before the data is sent to the reducer on the same machine. This allows to aggregate the data directly and there is a good chance that this reduces the data that is sent across the network.

This is surely not an issue in this example, but it might have a huge impact in real scenarios.

Reducer

Analogous to a mapper, a reducer can have a setup and a cleanup step, but since it isn’t strictly required we can skip this here as well and jump to the reduce step where the actual processing happens.

A reducer operates with some assumptions about the input data and this really eases the implementation:

  • They keys are sorted

  • A single reducer is responsible for a single specific key

  • There can be a multitude of values in the supplied key-value pairs

public class TodoReducer extends Reducer<Text, IntWritable, Text, IntArrayWritable> {

    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws java.io.IOException,
            InterruptedException
    {
        List<Integer> idList = new ArrayList<>();

        for (IntWritable value : values) { (1)
            idList.add(value.get());
        }

        context.write(key, new IntArrayWritable(idList.toArray(Integer[]::new))); (2)
    }
}
1 This simply collects all found ids and appends them to a list.
2 When the data is written back to the context the custom class IntArrayWriteable is used, which has been omitted here for brevity.

Now that we have every component in place it is time to talk about how to actually put them to use.

How to run it?

I briefly mentioned the resource manager YARN and its job scheduling capabilities in the previous article which handles all the allocation and control aspects of it.

Before see how we can actually create a job let us first talk about the general job submission flow:

1 Clients submit jobs to the resource manager.
2 MapReduce jobs are started inside of containers and they report their status back to the app master.
3 Node manager inform the resource manager about their status and available resources.
4 And finally the resource manager requests resources from the respective node managers.

Creating a job

Jobs are the actual workhorse of the flows and are supplied as small jar libraries.

The entry point of these libraries is a main function which sets all relevant configuration like mapper, reducer classes, the input or output formats and also the respective paths on the Hadoop cluster.

public class TodoCollect extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        Configuration conf = new Configuration(true);

        Job job = Job.getInstance(conf, "TodoCollect"); (1)

        job.setJarByClass(getClass());

        job.setMapperClass(TodoMapper.class); (2)
        job.setReducerClass(TodoReducer.class);
        job.setNumReduceTasks(1);

        job.setOutputKeyClass(Text.class); (3)
        job.setOutputValueClass(IntArrayWritable.class);

        FileInputFormat.addInputPath(job, inputPath); (4)
        FileOutputFormat.setOutputPath(job, outputPath);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new TodoCollect(), args);

        System.exit(exitCode);
    }
}
1 The definition of a job is really straight forward.
2 Mapper, reducer and any other steps like combiner can be configured here.
3 The output types must be supplied in order to write the data back to storage.
4 We supply the input and output path via argument.

The compilation of the job files is pretty straight forward:

$ mvn clean package -Dmaven.test.skip=true
...
[INFO] --- jar:3.3.0:jar (default-jar) @ todo-mapreduce ---

[INFO] Building jar: /home/unexist/projects/showcase-hadoop-cdc-quarkus/todo-mapreduce/target/todo-mapreduce-0.1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  3.241 s
[INFO] Finished at: 2024-01-05T14:50:44+01:00

Before we can actually launch our job we have to create the input path and put some data into it:

$ hadoop fs -mkdir input
$ hadoop fs -put todo.json input/

Once everything is set we can finally launch our job:

$ hadoop jar todo-mapreduce-0.1.jar dev.unexist.showcase.todo.TodoCollect input output

If all goes well, the output directly will be created, and the resulting ids can be found inside a file in the output directory.

Testing with MRUnit

Testing and debugging MapReduce jobs can be quite difficult due to the lack of real output and the distributed nature of the Hadoop stack. Also setting up Hadoop is quite complex for single developers and requires a good amount of computing resources.

This can be eased a bit with the virtualized approaches of MiniDFS or MiniCluster, but fortunately there is a better way, which also plays perfectly well with JUnit.

MRUnit is a Java library that helps developers testing map and reduce tasks independently and with full control of input of output, without the overhead and burden of a running Hadoop cluster locally.

The following example suite demonstrates the different drivers and the execution of the actual tests:

public class TodoMapperReducerTest {
    final static String[] RECORD = {
            "{\"title\":\"string\",\"description\":\"string\",\"done\":false,\"dueDate\":{\"start\":\"2021-05-07\"," +
                    "\"due\":\"2021-05-07\"},\"id\":0}",
            "{\"title\":\"string\",\"description\":\"string\",\"done\":false,\"dueDate\":{\"start\":\"2021-05-07\"," +
                    "\"due\":\"2021-05-07\"},\"id\":1}"
    };

    MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
    ReduceDriver<Text, IntWritable, Text, IntArrayWritable> reduceDriver;
    MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntArrayWritable> mapReduceDriver;

    @Before
    public void setUp() {
        TodoMapper mapper = new TodoMapper();
        TodoReducer reducer = new TodoReducer();

        mapDriver = MapDriver.newMapDriver(mapper); (1)
        reduceDriver = ReduceDriver.newReduceDriver(reducer);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
    }

    @Test
    public void shouldVerifyMapper() throws IOException {
        mapDriver.withInput(new LongWritable(), new Text(RECORD[0])); (2)
        mapDriver.withOutput(new Text("2021-05-07"), new IntWritable(0));
        mapDriver.runTest();
    }

    @Test
    public void shouldVerifyReducer() throws IOException {
        reduceDriver.withInput(new Text("2021-05-07"), Arrays.asList( (3)
                        new IntWritable(0), new IntWritable(1)
                )
        );
        reduceDriver.withOutput(new Text("2021-05-07"),
                new IntArrayWritable(new Integer[] { 0, 1 }));
        reduceDriver.runTest();
    }

    @Test
    public void shouldVerfiyMapAndReduce() throws IOException {
        mapReduceDriver.withInput(new LongWritable(), new Text(RECORD[0])); (4)
        mapReduceDriver.withInput(new LongWritable(), new Text(RECORD[1]));

        mapReduceDriver.withOutput(new Text("2021-05-07"),
                new IntArrayWritable(new Integer[] { 0, 1}));
        mapReduceDriver.runTest();
    }
}
1 During the setup phase we create drivers for our different phases.
2 Splitting the phases into single tests allow to check the outputs independently.
3 The reducing phase can be tested accordingly.
4 And everything can also naturally be combined.

Conclusion

The programming model MapReduce allows to split complicated tasks into smaller units and also operates directly on the data instead of moving data to the client.

This works splendidly with bigger files and avoids network congestion, since only the results are sent back to the client.

All examples can be found here:

Bibliography

  • [hadooparch] Mark Grover, Ted Malask, Jonathan Seidman, Gwen Shapira, Hadoop Application Architectures, O’Reilly 2015