
- 一、前期准备
- 二、数据准备
- 三、天气预报案例
可参考 “词频统计” 案例中的前期准备阶段
二、数据准备生成天气数据,上传至hdfs
package com.hdtrain;
import javafx.scene.input.DataFormat;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class GenerateWeather {
public static void main(String[] args) throws ParseException {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = dateFormat.parse("2000-01-01 00:00:00").getTime();
long end = dateFormat.parse("2019-12-31 23:59:59").getTime();
long different = end - start;
for (int i=0;i<10000;i++){
Date date = new Date(start + (long) (Math.random() * different));
int temperature = -20 + (int) (Math.random() * 60);
System.out.println(dateFormat.format(date) + "\t" + temperature);
}
}
}
三、天气预报案例
1.WeatherJob.class
package com.hdtrain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WeatherJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.framework.name", "local");
Job job = Job.getInstance(configuration);
job.setJobName("Weather--" + System.currentTimeMillis());
job.setJarByClass(WeatherJob.class);
job.setNumReduceTasks(2);
FileInputFormat.setInputPaths(job, new Path("/data/weather.txt"));
FileOutputFormat.setOutputPath(job, new Path("/results/Weather-" + System.currentTimeMillis()));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);
job.waitForCompletion(true);
}
}
②计算每个月份的最大的3个天气
代码同上
③计算每年每月温度最高的3天
package com.hdtrain.Weather;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WeatherJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.framework.name", "local");
Job job = Job.getInstance(configuration);
job.setJobName("Weather--" + System.currentTimeMillis());
job.setJarByClass(WeatherJob.class);
job.setNumReduceTasks(2);
// 设置切片
FileInputFormat.setMaxInputSplitSize(job, 256 * 1024 *1024);
FileInputFormat.setMinInputSplitSize(job, 64 * 1024 *1024);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("/data/weather.txt"));
FileOutputFormat.setOutputPath(job, new Path("/results/Weather-" + System.currentTimeMillis()));
// 设置map输出key类型
job.setMapOutputKeyClass(Weather.class);
// 设置map输出value类型
job.setMapOutputValueClass(IntWritable.class);
// 设置分区器
job.setPartitionerClass(WeatherPartitioner.class);
// 设置分组比较器
job.setGroupingComparatorClass(WeatherGroupingComparator.class);
// 设置map
job.setMapperClass(WeatherMapper.class);
// 设置reduce
job.setReducerClass(WeatherReducer.class);
job.waitForCompletion(true);
}
}
2.WeatherMapper.class
①计算每个月份的最大天气
package com.hdtrain;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WeatherMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] dw = value.toString().split("\t");
if (dw != null && dw.length != 0) {
context.write(new Text(dw[0].substring(0, 7)), new IntWritable(Integer.parseInt(dw[1])));
}
}
}
②计算每个月份的最大的3个天气
代码同上
③计算每年每月温度最高的3天
package com.hdtrain.Weather;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class WeatherMapper extends Mapper<LongWritable, Text, Weather, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String[] dw = value.toString().split("\t");
Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(dw[0]);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
IntWritable temperature = new IntWritable(Integer.parseInt(dw[1]));
Weather weather = new Weather();
weather.setYear(calendar.get(Calendar.YEAR));
weather.setMonth(calendar.get(Calendar.MONTH) + 1);
weather.setDay(calendar.get(Calendar.DAY_OF_MONTH));
weather.setTemperature(temperature.get());
context.write(weather, temperature);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
3.WeatherReducer.class
①计算每个月份的最大天气
package com.hdtrain;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
import static java.lang.Integer.MIN_VALUE;
public class WeatherReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int max = MIN_VALUE;
Iterator<IntWritable> iterator = values.iterator();
while(iterator.hasNext()){
int weather = iterator.next().get();
if (weather > max){
max = weather;
}
}
context.write(key, new IntWritable(max));
}
}
②计算每个月份的最大的3个天气
package com.hdtrain;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.stream.Collectors;
import static java.lang.Integer.MIN_VALUE;
public class WeatherReducer extends Reducer<Text, IntWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
ArrayList<Integer> list = new ArrayList<>();
Iterator<IntWritable> iterator = values.iterator();
while(iterator.hasNext()){
list.add(iterator.next().get());
}
context.write(key, new Text(list.stream().distinct().sorted(Comparator.reverseOrder()).limit(3).collect(Collectors.toList()).toString()));
}
}
③计算每年每月温度最高的3天
package com.hdtrain.Weather;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.stream.Collectors;
import static java.lang.Integer.MIN_VALUE;
public class WeatherReducer extends Reducer<Weather, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Weather key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()){
IntWritable temperature = iterator.next();
Text outputKey = new Text(key.getYear() + "-" + key.getMonth() + "-" + key.getDay());
if(count++ <=2){
context.write(outputKey, temperature);
} else {
return;
}
}
}
}
Weather.class
package com.hdtrain.Weather;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Weather implements WritableComparable<Weather> {
private Integer year;
private Integer month;
private Integer day;
private Integer temperature;
public Weather() {
}
public Integer getYear() {
return year;
}
public Integer getMonth() {
return month;
}
public Integer getDay() {
return day;
}
public Integer getTemperature() {
return temperature;
}
public void setYear(Integer year) {
this.year = year;
}
public void setMonth(Integer month) {
this.month = month;
}
public void setDay(Integer day) {
this.day = day;
}
public void setTemperature(Integer temperature) {
this.temperature = temperature;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.year);
dataOutput.writeInt(this.month);
dataOutput.writeInt(this.day);
dataOutput.writeInt(this.temperature);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.year = dataInput.readInt();
this.month = dataInput.readInt();
this.day = dataInput.readInt();
this.temperature = dataInput.readInt();
}
@Override
public int compareTo(Weather o) {
// 默认比较年
int result = this.year.compareTo(o.getYear());
// 查看比较结果,如果年相同,继续比较月份
if (result==0){
// 开始比较月份
result = this.month.compareTo(o.getMonth());
// 查看比较结果,如果月相同,继续比较温度
if (result==0){
result = this.temperature.compareTo(o.getTemperature()) * -1;
}
}
return result;
}
}
WeatherPartitioner.class
package com.hdtrain.Weather;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class WeatherPartitioner extends Partitioner<Weather, IntWritable> {
@Override
public int getPartition(Weather weather, IntWritable intWritable, int numPartitions) {
// 使用年月进行累加,然后对总reduce取余
return (weather.getYear() + weather.getMonth()) % numPartitions;
}
}
WeatherGroupingComparator.class
package com.hdtrain.Weather;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class WeatherGroupingComparator extends WritableComparator {
public WeatherGroupingComparator() {
super(Weather.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 将对象转型
Weather w1 = (Weather) a;
Weather w2 = (Weather) b;
// 开始比较对象w1和w2
int result = w1.getYear().compareTo(w2.getYear());
if (result == 0){
result = w1.getMonth().compareTo(w2.getMonth());
}
return result;
}
}
4.计算结果
①计算每个月份的最大天气
②计算每个月份的最大的3个天气
③计算每年每月温度最高的3天
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)