欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mapreduce剖析气象站平均气温

程序员文章站 2022-07-14 13:32:01
...

一、气象数据

按行并以 ASCII 格式存储,每一行是一条记录。下图展示了一行采样数据。

mapreduce剖析气象站平均气温

1998            #year
03          #month
09          #day
17          #hour
11           #temperature
-100            #dew
10237           #pressure
60          #wind_direction    
72          #wind_speed
0           #sky_condition    
0           #rain_1h 
-9999           #rain_6h

将每个气象站的数据文件拼接成一个单独的文件更容易处理,预处理过的数据文件示例如下所示:

mapreduce剖析气象站平均气温

1985 07 31 02   200    94 10137   220    26     1     0 -9999
1985 07 31 03   172    94 10142   240     0     0     0 -9999
1985 07 31 04   156    83 10148   260    10     0     0 -9999
1985 07 31 05   133    78 -9999   250     0 -9999     0 -9999
1985 07 31 06   122    72 -9999    90     0 -9999     0     0
1985 07 31 07   117    67 -9999    60     0 -9999     0 -9999
1985 07 31 08   111    61 -9999    90     0 -9999     0 -9999
1985 07 31 09   111    61 -9999    60     5 -9999     0 -9999
1985 07 31 10   106    67 -9999    80     0 -9999     0 -9999
1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999

二、数据导入hdfs

使用myeclipse链接集群hdfs,将数据导入hdfs系统的/home/hadoop/weather目录下(自行创建一个存放数据的目录)。

mapreduce剖析气象站平均气温

mapreduce剖析气象站平均气温

命令行访问

mapreduce剖析气象站平均气温

三、思路分析

1.map 阶段输入 原始数据。选择文本格式作为输入格式,将数据集的每一行作为文本输入。提取气象站和气温信息,并将它们作为输出。

2.reduce阶段接收map 函数的输出结果:每个气象站后面紧跟着一系列气温数据,使用reduce 函数遍历整个列表并统计出平均气温。

四、代码

1.编写 Mapper 类,实现 map() 方法,提取气象站和气温数据。

/*
     * 定义一个Mapper类实现map()方法解析气象站数据
     * input key=偏移量,input value =每行数据
     * output key=weatherStationID, output value =temperature 
     */

    public class TempratureMapper extends Mapper<LongWritable,Text,Text,IntWritable>

    {
        public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException

        { 
            //将每行数据转换成string类型
            String line = value.toString();

            //提取气温值
            int temperature = Integer.parseInt(line.substring(14,19).trim());

            //过滤无效数据
            if(temperature != -9999)

            {
                //获取分片
                FileSplit filesplit = (FileSplit) context.getInputSplit();

                //获取气象站编号
                String weatherStationID  = filesplit.getPath().getName().substring(5, 10);

                //输出处理好的数据
                context.write(new Text(weatherStationID),new IntWritable(temperature));
            }

        }

2.编写reduce函数,统计平均气温

/*
     * 定义一个reducer类实现reduce()方法统计各个气象站的平均气温
     * input key=weatherStationID,input value =temperature
     * output key=weatherStationID, output value =average(temperature) 
     */

    public class TemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>

    {
        //自定义成员变量result用以保存输出结果
        private IntWritable result = new IntWritable();

        public void reduce(Text key,Iterable< IntWritable> values,Context context) throws IOException, InterruptedException

        {
            //统计相同气象站的所有气温值
            int sum = 0;
            int count = 0;
            for(IntWritable i : values)
            {
                //对所有气温值累加
                sum += i.get();

                //统计集合大小
                count++;
            }

            //求同一个气象站的平均气温
            result.set(sum/count);

            context.write(key, result);
        }
    }

ps:Iterable:迭代器,就是专门处理集合元素的对象,例如删除和获取集合中的元素。但是该对象比较特殊,不能直接创建对象(通过new),该对象是以内部类的形式存在于每个集合类的内部。

3.编写函数,运行 MapReduce 作业。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 
 * @function 统计美国各个气象站30年来的平均气温
 * @author 小讲
 *
 */
public class Temperature extends Configured implements Tool {
/**
     * @function 任务驱动方法
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();//读取配置文件

        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }

        Job job = new Job(conf, "temperature");//新建一个任务
        job.setJarByClass(Temperature.class);// 设置主类

        FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径

        job.setMapperClass(TemperatureMapper.class);// Mapper
        job.setReducerClass(TemperatureReducer.class);// Reducer

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);     
        return job.waitForCompletion(true)?0:1;//提交任务
    }

    /**
     * @function main 方法
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //数据输入路径和输出路径
        String[] args0 = {
                            "hdfs://pc1:9000/home/hadoop/weather/",
                            "hdfs://pc1:9000/home/hadoop/weather/out/"
                        };
        int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
        System.exit(ec);
    }

4.对代码进行单元测试及debug调试。

编写mapper测试方法

public class TemperatureMapperTest {
    private Mapper mapper;//定义一个Mapper对象
    private MapDriver driver;//定义一个MapDriver 对象

    @Before
    public void init() {
        mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
        driver = new MapDriver(mapper);//实例化MapDriver对象
    }

    @Test
    public void test() throws IOException {
        //输入一行测试数据
        String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
        driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致
                .withOutput(new Text("weatherStationId"), new IntWritable(200))//跟TemperatureMapper输出类型一致
                .runTest();
    }
}

鼠标放在 TemperatureMapperTest 类上右击,选择 Run As ——> JUnit test,运行结果如下图所示:

mapreduce剖析气象站平均气温

左边的对话框里显示”Runs:1/1,Errors:0,Failures:0”,说明 Mapper 测试成功了。

编写reduce测试方法

public class TemperatureReduceTest {
    private Reducer reducer;//定义一个Reducer对象 
    private ReduceDriver driver;//定义一个ReduceDriver对象

    @Before
    public void init() {
        reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象
        driver = new ReduceDriver(reducer);//实例化ReduceDriver对象
    }

    @Test
    public void test() throws IOException {
        String key = "weatherStationId";//声明一个key值
        List values = new ArrayList();
        values.add(new IntWritable(200));//添加第一个value值
        values.add(new IntWritable(100));//添加第二个value值
        driver.withInput(new Text("weatherStationId"), values)//跟TemperatureReducer输入类型一致
              .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致
              .runTest();
    }
}

mapreduce剖析气象站平均气温

右键 Temperature 项目,选择”Debug As” ——> “java Application”,在程序中打上断点后直接进入调试模式,如下图所示。

mapreduce剖析气象站平均气温

五、运行程序

1.在myeclipse上运行程序

mapreduce剖析气象站平均气温

mapreduce剖析气象站平均气温

mapreduce剖析气象站平均气温

2.将 MapReduce作业打成jar包即temperature.jar,然后然后上传至/home/hadoop/ 目录下,由 hadoop 脚本来执行。

mapreduce剖析气象站平均气温

mapreduce剖析气象站平均气温

mapreduce剖析气象站平均气温

保存为Temp.jar,通过rz命令将Temp.jar上传到、home/hadoop/app/hadoop目录下。执行命令:

[aaa@qq.com hadoop]$ bin/hadoop jar Temp.jar com.pc1.hadoop.test.Temperature home/hadoop/weather/ home/hadoop/weather/out/

最后在 out目录查看结果 ,与本地结果一致。

相关标签: mapreduce