欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce的工作原理及实现案例

程序员文章站 2022-06-05 18:47:47
...

MapReduce的工作原理

1、Client向ResourceManager提交任务申请,RM找到NodeManager并启动一个AppMaster,AM通过获取到的分片信息,向RM申请资源,并启动相应数量的maptask;
2、在maptask上读取文件,由TextInputFormat指定读取规则,调用RecordReader方法按行读取,将行号和每行数据组成文件块进行返回,返回的LongWritable和Text将作为Mapper中map方法的入口数据;
3、每次获取的行的偏移量和每一行内容通过map()方法进行逻辑运算形成新的键值对,通过context对象写到OutputCollector中;
4、OutputCollector把收集到的键值对发送到环形缓冲区,就进入了shuffle过程.环形缓冲区的默认大小为100M,当数据累计到其大小的80%,就会触发溢出;
5、spill溢出前是需要对数据进行分区和排序的,如果没有设置分区器,就会调用系统默认的分区方法,对环形缓冲区的每个键值对hash一个partition值,相同值得分为同一个区,分区数跟NumReduceTasks相关。然后再对Key值进行升序排序;
6、环形缓冲区中分区排序后的数据溢出到文件中,如果map阶段处理的数据量较大,则会溢出多个小文件。如果设置了combiner,相同Key的value值相加,当处理数据量较大时,目的是让尽可能少的数据写入到磁盘,提高运行效率,否则文件中将存在多个相同的Key;
7、多个溢出的小文件会被归并排序为一个大文件,大文件中的数据仍然是分区且分区有序的;
8、当mrAppmaster监控到所有的map task任务完成后,reduce task会根据自己的分区号去每个不同的map task节点上去取相同分区的数据,然后将取过来的数据在进行merge合并成一个大文件,大文件是按照k有序的,此时,shuffle过程结束;
9、在reduce task运算过程中,首先调用groupingComparator对大文件的数据进行分组,每次取出一组键值,通过自定义的reduce()方法进行逻辑处理,然后根据OutPutFormat指定的输出格式将数据写到hdfs文件中。输出文件的个数与reduce个数一致。

WordCount的MapReduce代码实现

public class WordMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    //LongWritable, Text(输入类型,行号和每一行内容),Text, IntWritable(输出类型)
    private Text word = new Text();
    private IntWritable count = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().replaceAll(",|\"|\\.|\\?|!|:|;","").split(" ");
        for (String _word : words) {
            word.set(_word);
            //向reducer传输内容
            context.write(word,count);
        }
    }
}
public class WordReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable count = new IntWritable(0);

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable value:values){
            sum += value.get();
        }
        count.set(sum);
        context.write(key,count);
    }
}
public class WordTest {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //创建HDFS访问路径配置信息
        Configuration config = new Configuration();
        config.set("fs.defaultFS","hdfs://192.168.37.200:9000");
        //创建mapreduce计算任务
        Job job = Job.getInstance(config,"wordCount");
        //设置主类,定位外部jar资源
        job.setJarByClass(WordTest.class);
        //设置任务数:和文件分片数和所存储的DN数有关
        job.setNumReduceTasks(2);
        //设置分区器Partitioner
        job.setPartitionerClass(WordPartitioner.class);
        //设置Combiner
        job.setCombinerClass(WordReducer.class);
        //设置mapper
        job.setMapperClass(WordMapper.class);
        //设置reducer
        job.setReducerClass(WordReducer.class);
        //设置mapper输出键值类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reducer输出键值类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置HDFS输入文件绑定job
        FileInputFormat.setInputPaths(job,new Path("/kb10/Pollyanna.txt"));
        FileOutputFormat.setOutputPath(job,new Path("/kb10/wordCount2"));
        //等待所有job步骤完成
        System.out.println(job.waitForCompletion(true));
    }
}