欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【MapReduce】一个没有足够经验无法解决的简单基础MapReduce问题

程序员文章站 2022-07-14 17:56:09
...

一. 事件起因

我在学习Hadopp相关知识时做了一道极为简单的练习题,然而在解决问题的过程中,却引发了一个对我而言,极为困难,一个礼拜才勉强解决的惊天大BUG。(小弟才疏学浅,各位的独到见解和批评讨论都可以留言,虚心接受)

二.练习原题以及数据准备

问题题目

(两表 inner join)
在 hdfs 目录/tmp/table/student 中存在 stu.txt 文件,按 tab分隔,字段名为(学号,姓名,课程号,班级名称),
hdfs 目录/tmp/table/student_location 中存在 stu_location.txt 文件,按 tab 分隔,字段名为(学号,省份,城市,区名),
对两个 hdfs目录的按学号求交集,输出结果结构按 tab 分隔后的四个字段为(学号,姓名,课程号,班级名称)

文件内容

stu.txt 文件内容:
【MapReduce】一个没有足够经验无法解决的简单基础MapReduce问题
stu_location.txt 文件内容:

【MapReduce】一个没有足够经验无法解决的简单基础MapReduce问题

三.初次实现代码

(略长可选择式查看,或只看实现思路)

实现思路:

(1)在map阶段读取参数文件,即stu.txt和stu_location.txt,如果这个文件的记录切割完有4个元素,说明他是学生信息,有5个元素说明他是地址信息,将两个信息都按照学号分组向reduce输出。

(2)在reduce阶段按学号分组,接收分组信息,并存入"inflist"集合中,如果inflist中的这个元素的size==2,说明他既有学生信息,也有地址信息,那么就是需要的元素,拼接成字符串输出即可。



public class DIY15 {
	public static void main(String[] args) throws Exception {
		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);
		
		job.setJarByClass(DIY15.class);
		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(SinfoWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileInputFormat.addInputPath(job, new Path(args[1]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));
		
		System.exit(job.waitForCompletion(true)?0:1);
	}
				//Mapper
	public static class JoinMapper extends Mapper<LongWritable, Text, IntWritable, SinfoWritable>{
		SinfoWritable sin=new SinfoWritable();
		@Override
		protected void map(
				LongWritable key,Text value,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String[] fileds=value.toString().split("\\t");
			if(fileds!=null&&fileds.length==4){
				
		sin.set(Integer.parseInt(fileds[0]),fileds[1], Integer.parseInt(fileds[2]), fileds[3], 
				"", "", "", "");
			
			}else if (fileds!=null&&fileds.length==5) {
		sin.set(Integer.parseInt(fileds[0]), "", 0, "",
				fileds[1], fileds[2],fileds[3],fileds[4]);
			}else {
				return;
			}
			context.write(new IntWritable(Integer.parseInt(fileds[0])), sin);
		
		}
		
	}
					//Reducer
	public static class JoinReducer extends Reducer<IntWritable, SinfoWritable, Text, NullWritable>{
		
		@Override
		protected void reduce(
				IntWritable sno,Iterable<SinfoWritable> values,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			
			Iterator<SinfoWritable> it=values.iterator();
			List<SinfoWritable> inflist=new ArrayList<SinfoWritable>();
			StringBuilder rsBuilder=new StringBuilder();
			while(it.hasNext()){
				SinfoWritable inf=it.next();
				inflist.add(inf);
			}
			
			if(inflist.size()==2){
				//放了兩個元素
				if(!inflist.get(0).getSname().equals("")){
					//放入第一個元素 (1是學生信息 2是住處信息)
				rsBuilder.append(inflist.get(0).getSno());
				rsBuilder.append("\t");
				rsBuilder.append(inflist.get(0).getSname());
				rsBuilder.append("\t");
				rsBuilder.append(inflist.get(0).getCourseno());
				rsBuilder.append("\t");
				rsBuilder.append(inflist.get(0).getClassname());
					context.write(new Text(rsBuilder.toString()), NullWritable.get());
				}else if (!inflist.get(1).getSname().equals("")) {
					rsBuilder.append(inflist.get(1).getSno());
					rsBuilder.append("\t");
					rsBuilder.append(inflist.get(1).getSname());
					rsBuilder.append("\t");
					rsBuilder.append(inflist.get(1).getCourseno());
					rsBuilder.append("\t");
					rsBuilder.append(inflist.get(1).getClassname());
						context.write(new Text(rsBuilder.toString()), NullWritable.get());
				}
					
			}
		
		
		}
		
	}
	
//自定义的学生信息序列化类
	public static class SinfoWritable implements Writable{
		private int sno;
		private String sname;
		private int courseno;
		private String classname;
		private String province;
		private String city;
		private String area;
		private String addr;
		
		
		public SinfoWritable() {
			// TODO Auto-generated constructor stub
		}
		public int getSno() {
			return sno;
		}
		public String getSname() {
			return sname;
		}
		public int getCourseno() {
			return courseno;
		}
		public String getClassname() {
			return classname;
		}
		public String getProvince() {
			return province;
		}
		public String getCity() {
			return city;
		}
		public String getArea() {
			return area;
		}
		public String getAddr() {
			return addr;
		}
		
		public void set(int sno, String sname, int courseno, String classname, String province, String city,
				String area, String addr){
			this.sno = sno;
			this.area = area;
			this.city = city;
			this.classname = classname;
			this.courseno = courseno;
			this.province = province;
			this.sname = sname;
			this.addr = addr;
		}
		@Override
		public void write(DataOutput out) throws IOException {
			// TODO Auto-generated method stub
			out.writeInt(sno);
			out.writeInt(courseno);
			
			out.writeUTF(classname);
			out.writeUTF(sname);
			out.writeUTF(addr);
			out.writeUTF(area);
			out.writeUTF(city);
			out.writeUTF(province);
		}
		@Override
		public void readFields(DataInput in) throws IOException {
			// TODO Auto-generated method stub
			this.sno = in.readInt();
			this.courseno = in.readInt();
			
			this.classname = in.readUTF();
			this.sname = in.readUTF();	
			this.addr = in.readUTF();
			this.area = in.readUTF();
			this.city = in.readUTF();
			this.province = in.readUTF();


		
		}
		@Override
		public String toString() {
			return "SinfoWritable [sno=" + sno + ", sname=" + sname
					+ ", courseno=" + courseno + ", classname=" + classname
					+ ", province=" + province + ", city=" + city + ", area="
					+ area + ", addr=" + addr + "]";
		}
		
		

		
		
	}
}

四.问题出现

运行代码

【MapReduce】一个没有足够经验无法解决的简单基础MapReduce问题
当我自信满满的以为这个练习小case的时候,我运行了我的程序,没有任何异常和报错出现了这样的结果

运行结果

【MapReduce】一个没有足够经验无法解决的简单基础MapReduce问题

五.结果分析和尝试解决

结果分析

在stu.txt中有1~4四条信息,在stu_location.txt中有1,4两条信息,如果能够成功的进行链接join操作应该有2两条信息,分别是【1 张三 1 一班】【4 赵六 2 四班】但是只有 "1号张三" 的信息,运行结果明显是不正确的。

1.错误猜想

第一次看到执行结果,我以为是自己代码出了什么纰漏?

  • 1 .write(DataOutput out)方法和readFields(DataInput in) 的参数变量的顺序是否一致(一致)
		@Override
		public void write(DataOutput out) throws IOException {
			// TODO Auto-generated method stub
			out.writeInt(sno);
			out.writeUTF(sname);
			out.writeInt(courseno);
			out.writeUTF(classname);
			out.writeUTF(province);
			out.writeUTF(city);
			out.writeUTF(area);
			out.writeUTF(addr);
		}
		@Override
		public void readFields(DataInput in) throws IOException {
			// TODO Auto-generated method stub
			this.sno = in.readInt();
					this.sname = in.readUTF();	
			this.courseno = in.readInt();
			
			this.classname = in.readUTF();
			this.province = in.readUTF();
			this.city = in.readUTF();
			this.area = in.readUTF();
			this.addr = in.readUTF();
		}

2.在map阶段的记录读取是否正确(正确,这里不做太多代码展示了)

3.reduce阶段的记录输出是否正确(问题所在)
我发现只有reduce在接受map传输过来的分组信息时是正确的 但是输出时完全不正确,即缺少了数据。

第一次尝试

为了发现问题的细节,我进行了如下操作
(1)我为SinfoWritable类添加了一个 getAll() 方法,为了获取其中的成员变量信息,便于输出观察

		public String getAll(){
			return sno + "," + sname
					+ "," + courseno + "," + classname+","+province+","+city+","+area+","+","+addr;
		}

(2)将reduce阶段的代码重写了一遍,并进行了分阶段输出(写的可能有些潦草,但是能说明问题)
①在map传过来分组信息后直接输出他的学号
②将每组的信息输出到inflist中,并且当时输出他的所有信息
③放入Inflist,之后再一次输出所有长度为2的Inflist元素信息

这里一定会有人问,你这不脱裤子放气——多此一举么,输出一次又输出一次?
不要心急,这里的“多此一举”就是我发现的问题所在

	//Reducere
	public static class JoinReducer extends Reducer<IntWritable, SinfoWritable, Text, NullWritable>{
		
		@Override
		protected void reduce(
				IntWritable sno,Iterable<SinfoWritable> values,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Iterator<SinfoWritable> it=values.iterator();
			List<SinfoWritable> inflist=new ArrayList<SinfoWritable>();
			StringBuilder rsBuilder=new StringBuilder();
			context.write(new Text(sno.get()+"号"), NullWritable.get());
			
			while(it.hasNext()){
				SinfoWritable inf=it.next();
				inflist.add(inf);

				//inf中的属性,在放入inflist之后丢失了?
				context.write(new Text("list["+(inflist.size()-1)+"]---"+inflist.get(inflist.size()-1).getAll()), NullWritable.get());

			
					
					
				}
			if(inflist.size()==2){
				context.write(new Text("list[0] :"+inflist.get(0).getAll()), NullWritable.get());
				context.write(new Text("list[1] :"+inflist.get(1).getAll()), NullWritable.get());
				
			}			
		
		}
		
	}

第二次运行结果

【MapReduce】一个没有足够经验无法解决的简单基础MapReduce问题

第二次结果分析

不晓得各位发现问题没有,放入inflist集合中的元素
1.在第一次输出的时候,没有任何问题,不管是学生信息还是地址信息都输出了出来
(FIRST- -list[0]和list[1]),
2.然而同样的集合我在第二次输出的时候,怎么就都变成了同样的内容了呢???
(SECOND- -list[0]和list[1]都是学生信息 或都是地址信息)

哪怕一直到我最后解决问题,也没有弄明白个所以然,放入list中的元素,上一秒还是两条不同的信息,下一秒就变成一样的了。。。。这个问题我也是一直在调试和查阅资料,也没找到个所以然,最后找了个投机取巧的解决办法。

六.问题解决方案(不完善)

1.解决思路

将一切实例化对象推翻,直接都转换成字符串进行操作。

2.关键代码

(全部代码我会在最后贴出来,这里我们只看Reduce阶段的代码)

实现步骤

1.我将map向reduce发送的分组信息,从<IntWritable, SinfoWritable>类型改成了<IntWritable, Text>类型
2.我将Inflist的元素泛型由Text改变成了String类型,一切以字符串来操作(这里吧StringBuilder也删除了,一切从简)。

	//Reducer
	public static class JoinReducer extends Reducer<IntWritable, Text, Text, NullWritable>{
		
		@Override
		protected void reduce(
				IntWritable sno,Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Iterator<Text> it= values.iterator();
			List<String> list=new ArrayList<String>();
			context.write(new Text(sno.get()+"号"), NullWritable.get());
			//要把接受过来的信息toString一下,不然可能会出现特殊的bug,传递对象的问题
			while(it.hasNext()){
				list.add(it.next().toString());
				context.write(new Text("FIRST--list["+(list.size()-1)+"]---"+list.get(list.size()-1)), NullWritable.get());

				//输出信息
			}
			if(list.size()==2){
				context.write(new Text("SECOND--list[0] :"+list.get(0)), NullWritable.get());
				context.write(new Text("SECOND--list[1] :"+list.get(1)), NullWritable.get());
				
			}
			
		
		}
		
	}

3.最终结果

【MapReduce】一个没有足够经验无法解决的简单基础MapReduce问题
这里可以看到最终的输出结果,无论是第一次输出还是第二次输出,它的结果都是一样的。

七.思考

这个问题虽然说是投机取巧解决了,不过也引发了我的一些思索。我觉得最大的可能还是和Text这种可序列化类型有关系,涉及到一些文件流的操作,将它转换为字符串之后就没有这样的问题了。

然而我现阶段也没能理解的很透彻,也是阅历和经验不大足够,欢迎各位大佬留言指点,文章略长,也比较啰嗦,也是头一回分享,看到这里也是不容易了:),感谢!