欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop学习笔记之初识MapReduce以及WordCount实例分析

程序员文章站 2022-06-30 11:47:12
...

MapReduce简介

MapReduce是什么?

MapReduce是一种编程模型,用于大规模数据集的分布式运算。

Mapreduce基本原理

1、MapReduce通俗解释

图书馆要清点图书数量,有10个书架,管理员为了加快统计速度,找来了10个同学,每个同学负责统计一个书架的图书数量。

张同学统计 书架1

王同学统计 书架2

刘同学统计 书架3

……

过了一会儿,10个同学陆续到管理员这汇报自己的统计数字,管理员把各个数字加起来,就得到了图书总数。

这个过程就可以理解为MapReduce的工作过程。

2、MapReduce中有两个核心操作

(1)map

管理员分配哪个同学统计哪个书架,每个同学都进行相同的“统计”操作,这个过程就是map。

(2)reduce

每个同学的结果进行汇总,这个过程是reduce。

Mapreduce工作过程

Hadoop学习笔记之初识MapReduce以及WordCount实例分析

各个角色实体

1.程序运行时过程设计到的一个角色实体
1.1. Client:编写mapreduce程序,配置作业,提交作业的客户端 ;
1.2. ResourceManager:集群中的资源分配管理 ;
1.3. NodeManager:启动和监管各自节点上的计算资源 ;
1.4. ApplicationMaster:每个程序对应一个AM,负责程序的任务调度,本身也是运行在NM的Container中 ;
1.5. HDFS:分布式文件系统,保存作业的数据、配置信息等等。

2.客户端提交Job
2.1. 客户端编写好Job后,调用Job实例的Submit()或者waitForCompletion()方法提交作业;
2.2. 客户端向ResourceManager请求分配一个Application ID,客户端会对程序的输出、输入路径进行检查,如果没有问题,进行作业输入分片的计算。

3.Job提交到ResourceManager
3.1. 将作业运行所需要的资源拷贝到HDFS中(jar包、配置文件和计算出来的输入分片信息等);
3.2. 调用ResourceManager的submitApplication方法将作业提交到ResourceManager。

4.给作业分配ApplicationMaster
4.1. ResourceManager收到submitApplication方法的调用之后会命令一个NodeManager启动一个Container ;
4.2. 在该NodeManager的Container上启动管理该作业的ApplicationMaster进程。

5. ApplicationMaster初始化作业
5.1. ApplicationMaster对作业进行初始化操作;
5.2. ApplicationMaster从HDFS中获得输入分片信息(map、reduce任务数)

6.任务分配
6.1. ApplicationMaster为其每个map和reduce任务向RM请求计算资源;
6.2. map任务优先于reduce任,map数据优先考虑本地化的数据。

7.任务执行,在 Container 上启动任务(通过YarnChild进程来运行),执行map/reduce任务。

时间先后顺序

1.输入分片(input split)
每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为128M,可以设置)为一个分片。map输出的结果会暂且放在一个环形内存缓冲区中(默认mapreduce.task.io.sort.mb=100M),当该缓冲区快要溢出时(默认mapreduce.map.sort.spill.percent=0.8),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件;

2.map阶段:由我们自己编写,最后调用 context.write(…);

3.partition分区阶段
3.1. 在map中调用 context.write(k2,v2)方法输出,该方法会立刻调用 Partitioner类对数据进行分区,一个分区对应一个 reduce task。
3.2. 默认的分区实现类是 HashPartitioner ,根据k2的哈希值 % numReduceTasks,可能出现“数据倾斜”现象。
3.3. 可以自定义 partition ,调用 job.setPartitioner(…)自己定义分区函数。

4.combiner合并阶段:将属于同一个reduce处理的输出结果进行合并操作
4.1. 是可选的;
4.2. 目的有三个:1.减少Key-Value对;2.减少网络传输;3.减少Reduce的处理。
4.3. combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以             使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。

5.shuffle阶段:即Map和Reduce中间的这个过程
5.1. 首先 map 在做输出时候会在内存里开启一个环形内存缓冲区,专门用来做输出,同时map还会启动一个守护线程;
5.2. 如缓冲区的内存达到了阈值的80%,守护线程就会把内容写到磁盘上,这个过程叫spill,另外的20%内存可以继续写入要写进磁盘的数据;
5.3. 写入磁盘和写入内存操作是互不干扰的,如果缓存区被撑满了,那么map就会阻塞写入内存的操作,让写入磁盘操作完成后再继续执行写入内存操作;
5.4. 写入磁盘时会有个排序操作,如果定义了combiner函数,那么排序前还会执行combiner操作;
5.5. 每次spill操作也就是写入磁盘操作时候就会写一个溢出文件,也就是说在做map输出有几次spill就会产生多少个溢出文件,等map输出全部做完后,map会合并这些输出文件,这个过程里还会有一个Partitioner操作(如上)
5.6. 最后 reduce 就是合并map输出文件,Partitioner会找到对应的map输出文件,然后进行复制操作,复制操作时reduce会开启几个复制线程,这些线程默认个数是5个(可修改),这个复制过程和map写入磁盘过程类似,也有阈值和内存大小,阈值一样可以在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操作和合并文件操作,这些操作完了就会进行reduce计算了。

6.reduce阶段:由我们自己编写,最终结果存储在hdfs上的。

Mapreduce实例—WordCount单词统计

Hadoop中的WordCount源码解析

参考Hadoop学习笔记:MapReduce框架详解(很详细, 必须看)
不再详述

自定义WordCount

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


 class WordCountMap extends Mapper<Object, Text, Text, IntWritable>{
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
//得到每一行数据
        String line = value.toString();
//通过空格分割,对这一行数据进行拆分 
        String [] words = line.split(" ");
 //循环遍历 输出
        for(String word:words){
          context.write(new Text(word), new IntWritable(1));
      }

    }
 }

  class WordCountReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
     Integer count = 0;
     for(IntWritable value: values ){
         count +=value.get();
         }
     context.write(key, new IntWritable(count));
     }
  }

  public class WordCount  extends Mapper<Object, Text, Text, IntWritable>{
  public static void main(String[] args) throws Exception {
   //创建配置对象
      Configuration conf = new Configuration();
    //创建job对象
      Job job = new Job(conf, "word count");
    //设置运行的job类
      job.setJarByClass(WordCount.class);
    //设置mapper类
      job.setMapperClass(WordCountMap.class);
      //设置reducer类
    job.setReducerClass(WordCountReducer.class);

    //设置map 输出的key value
    job.setMapOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //设置reduce 输出的key value
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //设置输入输出的路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //提交job
    boolean b = job.waitForCompletion(true);
    if(!b){
    System.out.println("wordcount task fail!");
    }

  }
}

运行时出现的错误以及解决方案

  1. 做mapreduce计算时候,输出一般是一个文件夹,而且该文件夹是不能存在,我在出面试题时候提到了这个问题,而且这个检查做的很早,当我们提交job时候就会进行,mapreduce之所以这么设计是保证数据可靠性,如果输出目录存在reduce就搞不清楚你到底是要追加还是覆盖,不管是追加和覆盖操作都会有可能导致最终结果出问题,mapreduce是做海量数据计算,一个生产计算的成本很高,例如一个job完全执行完可能要几个小时,因此一切影响错误的情况mapreduce是零容忍的。

2.eclipse中编译的jdk与集群上的jdk版本不同会报错, 解决方案: 工程上右键->properties->java Bulid Path-> Libraries ->移除原来的Jar System ->add Library ->添加与集群相同的jdk

参考:

1.7个实例全面掌握Hadoop MapReduce
2.Hadoop学习笔记—4.初识MapReduce
3.初学Hadoop之图解MapReduce与WordCount示例分析
4.Hadoop学习笔记:MapReduce框架详解(很详细, 必须看)