Need to solve the Word Count program using Hadoop Streaming following the below instructions
1. In class we wrote a MapReduce program in Java to compute the word counts for any given input. In this assignment, you will repeat solving the same problem but using Hadoop streaming.
2. Create two scripts in Python namely wordcount_map.py and wordcount_reduce.py to be used by the mappers and reducers of the streaming job.
3. Your script files must be executable (consider chmod command), and must include the necessary shebang (like in the attached script files).
4. Attached are the script files we used in class to demonstrate Hadoop streaming, namely: maxtemp_map.py and maxtemp_reduce.py. They can help you to get started.
5. Recall the streaming command:
$ mapred streaming \
-files
-mapper
-reducer
-input
-output
(extra options: -combiner, -numReduceTasks, etc.)
MaxTemperature Example file is the program file discussed in Class.
Instructions:
1. In class we wrote a MapReduce program in Java to compute the word counts for any given input. In this assignment you will repeat solving the same problem but using Hadoop streaming.
2. Create two scripts in Python namely wordcount_map.py and wordcount_reduce.py to be used by the mappers and reducers of the streaming job.
3. Your script files must be executable (consider chmod command), and must include the necessary shebang (like in the attached script files).
4. Attached are the script files we used in class to demonstrate Hadoop streaming, namely: maxtemp_map.py and maxtemp_reduce.py. They can help you to get started.
5. Recall the streaming command:
$ mapred streaming \
-files
-mapper
-reducer
-input
-output
(extra options: -combiner, -numReduceTasks, etc.)
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
year, temp, q = val[15:19], val[87:92], val[92:93]
if (temp != “+9999” and re.match(“[01459]”, q)):
print(“%s\t%s” % (year, temp))
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
year, temp, q = val[15:19], val[87:92], val[92:93]
if (temp != “+9999” and re.match(“[01459]”, q)):
print(“%s\t%s” % (year, temp))
Mapper for the Maximum temperature Example:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
if (line.charAt(87) == ‘+’) { // parseInt doesn’t like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
}
else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches(“[01459]”)) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
Reducer for the max temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer
@Override
public void reduce(Text key, Iterable
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
Application to find the maximum temperature in the weather dataset
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println(“Usage: MaxTemperature
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName(“Max temperature”);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}