欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

MapReduce编程实战之WordCount简单案例分析

程序员文章站 2022-04-03 17:52:46
<2>MapReduce任务执行过程 看图说话! MapReduce 特点 MapReduce 为什么如此受欢迎?尤其现在互联网+时代,互联网+公司都在使用MapRedu...
<2>MapReduce任务执行过程

MapReduce编程实战之WordCount简单案例分析

看图说话!

MapReduce 特点

MapReduce 为什么如此受欢迎?尤其现在互联网+时代,互联网+公司都在使用MapReduce。MapReduce 之所以如此受欢迎,它主要有以下几个特点。

MapReduce 易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布

式程序可以分布到大量廉价的 PC 机器运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。

2、良好的 扩展性 。当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展

它的计算能力。

3、 高容错性 。MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求

它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上面上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由

Hadoop 内部完成的。

4、适合 PB 级以上海量数据的 离线处理 。这里加红字体离线处理,说明它适合离线处理而不

适合在线处理。比如像毫秒级别的返回一个结果,MapReduce 很难做到。

 

MapReduce 虽然具有很多的优势,但是它也有不擅长的地方。这里的不擅长不代表它不能做,而是在有些场景下实现的效果差,并不适合 MapReduce 来处理,主要表现在以下几个方面:

1、实时计算。MapReduce 无法像 Mysql 一样,在毫秒或者秒级内返回结果

2、流式计算。流式计算的输入数据时动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。

3、DAG(有向图)计算。多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

代码实例

package?com.wgy.mapreduce.wc;

import?java.io.IOException;

import?java.util.StringTokenizer;

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.FileSystem;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.LongWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public?class?Wordcount {

public?static?class?TokenizerMapper extends

Mapper,>

//这个Mapper类是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键、输出值的类型。hadoop没有直接使用Java内嵌的类型,而是自己开发了一套可以优化网络序列化传输的基本类型。这些类型都在org.apache.hadoop.io包中。

//比如这个例子中的Object类型,适用于字段需要使用多种类型的时候,Text类型相当于Java中的String类型,IntWritable类型相当于Java中的Integer类型

{

//定义两个变量

//private final static LongWritable one=new LongWritable(1);

private?final?static?IntWritable one?= new?IntWritable(1);//这个1表示每个单词出现一次,map的输出value就是1.

private?Text word?= new?Text();//每行数据

//实现map函数

public?void?map(Object key, Text value, Context context)

//context它是mapper的一个内部类,简单的说*接口是为了在map或是reduce任务中跟踪task的状态,很自然的MapContext就是记录了map执行的上下文,在mapper类中,这个context可以存储一些job conf的信息,比如job运行时参数等,我们可以在map函数中处理这个信息,这也是Hadoop中参数传递中一个很经典的例子,同时context作为了map和reduce执行中各个函数的一个桥梁,这个设计和Java web中的session对象、application对象很相似

//简单的说context对象保存了作业运行的上下文信息,比如:作业配置信息、InputSplit信息、任务ID等

//我们这里最直观的就是主要用到context的write方法。

throws?IOException, InterruptedException {

//The tokenizer?uses the default delimiter set, which is " \t\n\r": the space character, the tab character, the newline character, the carriage-return character

String line= value.toString(); ??// 将输入的纯文本文件的数据转化成String

?// 将输入的数据首先按行进行分割

StringTokenizer itr?= new?StringTokenizer(line);//将Text类型的value转化成字符串类型

//StringTokenizer是字符串分隔解析类型,StringTokenizer 用来分割字符串,你可以指定分隔符,比如',',或者空格之类的字符。

while?(itr.hasMoreTokens()) {//hasMoreTokens() 方法是用来测试是否有此标记生成器的字符串可用更多的标记。

//java.util.StringTokenizer.hasMoreTokens()

word.set(itr.nextToken());//nextToken()这是 StringTokenizer 类下的一个方法,nextToken() 用于返回下一个匹配的字段。

context.write(word, one);

}

}

}

public?static?class?IntSumReducer extends

Reducer {,>

private?IntWritable result?= new?IntWritable();

//实现reduce函数

public?void?reduce(Text key, Iterable values,

Context context) throws?IOException, InterruptedException {

int?sum?= 0;

for?(IntWritable val?: values) {

sum?+= val.get();

}

result.set(sum);

context.write(key, result);

}

?}

public?static?void?main(String[] args) throws?Exception {

Configuration conf?= new?Configuration();

//Configuration类代表作业的配置,该类会加载mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。

//删除已经存在的输出目录

Path mypath?= new?Path("hdfs://cdh001:8020/user/wordcount-out");//输出路径

FileSystem hdfs?= mypath.getFileSystem(conf);//获取文件系统

//如果文件系统中存在这个输出路径,则删除掉,保证输出目录不能提前存在。

if?(hdfs.isDirectory(mypath)) {

hdfs.delete(mypath, true);

}

//job对象指定了作业执行规范,可以用它来控制整个作业的运行。

Job job?= Job.getInstance();// new Job(conf, "word count");

job.setJarByClass(Wordcount.class);//我们在hadoop集群上运行作业的时候,要把代码打包成一个jar文件,然后把这个文件

//传到集群上,然后通过命令来执行这个作业,但是命令中不必指定JAR文件的名称,在这条命令中通过job对象的setJarByClass()

//中传递一个主类就行,hadoop会通过这个主类来查找包含它的JAR文件。

job.setMapperClass(TokenizerMapper.class);

//job.setReducerClass(IntSumReducer.class);

job.setCombinerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//一般情况下mapper和reducer的输出的数据类型是一样的,所以我们用上面两条命令就行,如果不一样,我们就可以用下面两条命令单独指定mapper的输出key、value的数据类型

//job.setMapOutputKeyClass(Text.class);

//job.setMapOutputValueClass(IntWritable.class);

//hadoop默认的是TextInputFormat和TextOutputFormat,所以说我们这里可以不用配置。

//job.setInputFormatClass(TextInputFormat.class);

//job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new?Path(

"hdfs://cdh001:8020/user/worldcount.txt"));//FileInputFormat.addInputPath()指定的这个路径可以是单个文件、一个目录或符合特定文件模式的一系列文件。

//从方法名称可以看出,可以通过多次调用这个方法来实现多路径的输入。

FileOutputFormat.setOutputPath(job, new?Path(

"hdfs://cdh001:8020/user/wordcount-out"));//只能有一个输出路径,该路径指定的就是reduce函数输出文件的写入目录。

//特别注意:输出目录不能提前存在,否则hadoop会报错并拒绝执行作业,这样做的目的是防止数据丢失,因为长时间运行的作业如果结果被意外覆盖掉,那肯定不是我们想要的

System.exit(job.waitForCompletion(true) ? 0 : 1);

????//使用job.waitForCompletion()提交作业并等待执行完成,该方法返回一个boolean值,表示执行成功或者失败,这个布尔值被转换成程序退出代码0或1,该布尔参数还是一个详细标识,所以作业会把进度写到控制台。

//waitForCompletion()提交作业后,每秒会轮询作业的进度,如果发现和上次报告后有改变,就把进度报告到控制台,作业完成后,如果成功就显示作业计数器,如果失败则把导致作业失败的错误输出到控制台

}

}