【Hadoop】MapReduce案例——天气预报案例

【Hadoop】MapReduce案例——天气预报案例,第1张

文章目录
  • 一、前期准备
  • 二、数据准备
  • 三、天气预报案例

一、前期准备

可参考 “词频统计” 案例中的前期准备阶段

二、数据准备

生成天气数据,上传至hdfs

package com.hdtrain;


import javafx.scene.input.DataFormat;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class GenerateWeather {
    public static void main(String[] args) throws ParseException {

        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long start = dateFormat.parse("2000-01-01 00:00:00").getTime();
        long end = dateFormat.parse("2019-12-31 23:59:59").getTime();
        long different = end - start;

        for (int i=0;i<10000;i++){

            Date date = new Date(start + (long) (Math.random() * different));

            int temperature = -20 + (int) (Math.random() * 60);

            System.out.println(dateFormat.format(date) + "\t" + temperature);
        }
    }
}
三、天气预报案例

1.WeatherJob.class

package com.hdtrain;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WeatherJob {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration(true);
        configuration.set("mapreduce.framework.name", "local");

        Job job = Job.getInstance(configuration);
        job.setJobName("Weather--" + System.currentTimeMillis());
        job.setJarByClass(WeatherJob.class);
        job.setNumReduceTasks(2);

        FileInputFormat.setInputPaths(job, new Path("/data/weather.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/results/Weather-" + System.currentTimeMillis()));

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setMapperClass(WeatherMapper.class);
        job.setReducerClass(WeatherReducer.class);

        job.waitForCompletion(true);
    }
}

②计算每个月份的最大的3个天气
代码同上
③计算每年每月温度最高的3天

package com.hdtrain.Weather;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WeatherJob {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration(true);
        configuration.set("mapreduce.framework.name", "local");

        Job job = Job.getInstance(configuration);
        job.setJobName("Weather--" + System.currentTimeMillis());
        job.setJarByClass(WeatherJob.class);
        job.setNumReduceTasks(2);
        // 设置切片
        FileInputFormat.setMaxInputSplitSize(job, 256 * 1024 *1024);
        FileInputFormat.setMinInputSplitSize(job, 64 * 1024 *1024);
        // 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("/data/weather.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/results/Weather-" + System.currentTimeMillis()));
        // 设置map输出key类型
        job.setMapOutputKeyClass(Weather.class);
        // 设置map输出value类型
        job.setMapOutputValueClass(IntWritable.class);
        // 设置分区器
        job.setPartitionerClass(WeatherPartitioner.class);
        // 设置分组比较器
        job.setGroupingComparatorClass(WeatherGroupingComparator.class);
        // 设置map
        job.setMapperClass(WeatherMapper.class);
        // 设置reduce
        job.setReducerClass(WeatherReducer.class);

        job.waitForCompletion(true);
    }
}

2.WeatherMapper.class
①计算每个月份的最大天气

package com.hdtrain;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WeatherMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] dw = value.toString().split("\t");
        if (dw != null && dw.length != 0) {
            context.write(new Text(dw[0].substring(0, 7)), new IntWritable(Integer.parseInt(dw[1])));
        }
    }
}

②计算每个月份的最大的3个天气
代码同上
③计算每年每月温度最高的3天

package com.hdtrain.Weather;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class WeatherMapper extends Mapper<LongWritable, Text, Weather, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        try {
            String[] dw = value.toString().split("\t");
            Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(dw[0]);

            Calendar calendar = Calendar.getInstance();
            calendar.setTime(date);

            IntWritable temperature = new IntWritable(Integer.parseInt(dw[1]));

            Weather weather = new Weather();
            weather.setYear(calendar.get(Calendar.YEAR));
            weather.setMonth(calendar.get(Calendar.MONTH) + 1);
            weather.setDay(calendar.get(Calendar.DAY_OF_MONTH));
            weather.setTemperature(temperature.get());

            context.write(weather, temperature);
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }
}

3.WeatherReducer.class
①计算每个月份的最大天气

package com.hdtrain;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

import static java.lang.Integer.MIN_VALUE;

public class WeatherReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = MIN_VALUE;
        Iterator<IntWritable> iterator = values.iterator();
        while(iterator.hasNext()){
            int weather = iterator.next().get();
            if (weather > max){
                max = weather;
            }
        }

        context.write(key, new IntWritable(max));
    }
}

②计算每个月份的最大的3个天气

package com.hdtrain;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.stream.Collectors;

import static java.lang.Integer.MIN_VALUE;

public class WeatherReducer extends Reducer<Text, IntWritable, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        ArrayList<Integer> list = new ArrayList<>();
        Iterator<IntWritable> iterator = values.iterator();
        while(iterator.hasNext()){
            list.add(iterator.next().get());
        }

        context.write(key, new Text(list.stream().distinct().sorted(Comparator.reverseOrder()).limit(3).collect(Collectors.toList()).toString()));
    }
}

③计算每年每月温度最高的3天

package com.hdtrain.Weather;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.stream.Collectors;

import static java.lang.Integer.MIN_VALUE;

public class WeatherReducer extends Reducer<Weather, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Weather key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;

        Iterator<IntWritable> iterator = values.iterator();

        while (iterator.hasNext()){
            IntWritable temperature = iterator.next();
            Text outputKey = new Text(key.getYear() + "-" + key.getMonth() + "-" + key.getDay());
            if(count++ <=2){
                context.write(outputKey, temperature);
            } else {
                return;
            }
        }
    }
}

Weather.class

package com.hdtrain.Weather;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Weather implements WritableComparable<Weather> {
    private Integer year;
    private Integer month;
    private Integer day;
    private Integer temperature;

    public Weather() {
    }

    public Integer getYear() {
        return year;
    }

    public Integer getMonth() {
        return month;
    }

    public Integer getDay() {
        return day;
    }

    public Integer getTemperature() {
        return temperature;
    }

    public void setYear(Integer year) {
        this.year = year;
    }

    public void setMonth(Integer month) {
        this.month = month;
    }

    public void setDay(Integer day) {
        this.day = day;
    }

    public void setTemperature(Integer temperature) {
        this.temperature = temperature;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.year);
        dataOutput.writeInt(this.month);
        dataOutput.writeInt(this.day);
        dataOutput.writeInt(this.temperature);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.year = dataInput.readInt();
        this.month = dataInput.readInt();
        this.day = dataInput.readInt();
        this.temperature = dataInput.readInt();
    }

    @Override
    public int compareTo(Weather o) {
        // 默认比较年
        int result = this.year.compareTo(o.getYear());
        // 查看比较结果,如果年相同,继续比较月份
        if (result==0){
            // 开始比较月份
            result = this.month.compareTo(o.getMonth());
            // 查看比较结果,如果月相同,继续比较温度
            if (result==0){
                result = this.temperature.compareTo(o.getTemperature()) * -1;
            }
        }
        return result;
    }
}

WeatherPartitioner.class

package com.hdtrain.Weather;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class WeatherPartitioner extends Partitioner<Weather, IntWritable> {
    @Override
    public int getPartition(Weather weather, IntWritable intWritable, int numPartitions) {
        // 使用年月进行累加,然后对总reduce取余
        return (weather.getYear() + weather.getMonth()) % numPartitions;
    }
}

WeatherGroupingComparator.class

package com.hdtrain.Weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class WeatherGroupingComparator extends WritableComparator {
    public WeatherGroupingComparator() {
        super(Weather.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // 将对象转型
        Weather w1 = (Weather) a;
        Weather w2 = (Weather) b;
        // 开始比较对象w1和w2
        int result = w1.getYear().compareTo(w2.getYear());
        if (result == 0){
            result = w1.getMonth().compareTo(w2.getMonth());
        }
        return result;
    }
}

4.计算结果
①计算每个月份的最大天气

②计算每个月份的最大的3个天气

③计算每年每月温度最高的3天

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/739210.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-04-28
下一篇2022-04-28

发表评论

登录后才能评论

评论列表(0条)

    保存