欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop--MapReduce6--控制输入输出格式

程序员文章站 2022-07-14 16:42:43
...

        在mapreduce处理过程中,map阶段处理数据,得到一系列key-value,然后由reduce将相同key进行聚合,maptask任务会分布在不同机器上执行,输出结果保存在运行机器上,reducetask首先需要将map输出文件下载本地运行机器,因此map以及reduce阶段输出文件的格式很重要,使用简洁格式可以大大减少数据的传输量。

在提交任务运行时,可以通过Job对象setOutputFormatClass来实现:

 /**
   * Set the {@link OutputFormat} for the job.
   * @param cls the <code>OutputFormat</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setOutputFormatClass(Class<? extends OutputFormat> cls
                                   ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
                  OutputFormat.class);
  }

可以设定的格式是一个抽象类的继承类来实现

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputFormat<K, V> {

  /** 
   * Get the {@link RecordWriter} for the given task.
   *
   * @param context the information about the current task.
   * @return a {@link RecordWriter} to write the output for the job.
   * @throws IOException
   */
  public abstract RecordWriter<K, V> 
    getRecordWriter(TaskAttemptContext context
                    ) throws IOException, InterruptedException;

  /** 
   * Check for validity of the output-specification for the job.
   *  
   * <p>This is to validate the output specification for the job when it is
   * a job is submitted.  Typically checks that it does not already exist,
   * throwing an exception when it already exists, so that output is not
   * overwritten.</p>
   *
   * @param context information about the job
   * @throws IOException when output should not be attempted
   */
  public abstract void checkOutputSpecs(JobContext context
                                        ) throws IOException, 
                                                 InterruptedException;

  /**
   * Get the output committer for this output format. This is responsible
   * for ensuring the output is committed correctly.
   * @param context the task context
   * @return an output committer
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
  OutputCommitter getOutputCommitter(TaskAttemptContext context
                                     ) throws IOException, InterruptedException;
}

可以继承抽象类自定义格式,hadoop提供的格式有:

                                        Hadoop--MapReduce6--控制输入输出格式

其中TextOutputFormat是默认的格式,下面来测试一下SequenceFileOutputFormat文件格式:

将倒排索引程序第一步job提交时设置:

job.setOutputFormatClass(SequenceFileOutputFormat.class);
public class IndexStepOne {

	public static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		
		String fileName = null;
		
		@Override
		protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			fileName = inputSplit.getPath().getName();
		}

		// 产生 <hello-文件名,1> 
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] words = value.toString().split(" ");
			for (String w : words) {
				// 将"单词-文件名"作为key,1作为value,输出
				context.write(new Text(w + "-" + fileName), new IntWritable(1));
			}
		}
	}

	public static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable value : values) {
				count += value.get();
			}
			context.write(key, new IntWritable(count));
		}
	}
	
	
	
	public static void main(String[] args) throws Exception{
		
		Configuration conf = new Configuration(); 
		
		Job job = Job.getInstance(conf);
		job.setJarByClass(IndexStepOne.class);
		job.setMapperClass(IndexStepOneMapper.class);
		job.setReducerClass(IndexStepOneReducer.class);
		job.setNumReduceTasks(1);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\index\\input"));
		FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\index\\out3"));

		job.waitForCompletion(true);	
	}
}

最终reduce输出的文件

SEQorg.apache.hadoop.io.Text org.apache.hadoop.io.IntWritable      aaa@qq.com鎀1(緁x7+???     
	c++-c.txt         hello-a.txt         hello-b.txt         hello-c.txt         
jack-b.txt         
java-c.txt         jerry-b.txt         jerry-c.txt         
	jim-a.txt         
	jim-b.txt         kitty-a.txt         kitty-b.txt         
rose-a.txt         
	tom-a.txt   

第二步,设定读取文件的格式:注意map任务读取的数据格式不是LongWritable以及Text而是上一步最后输出的文件格式类型Text,IntWritable,即改变文件格式以后,map并不是按行读取数据。

job.setInputFormatClass(SequenceFileInputFormat.class);
public class IndexStepTwo {

	public static class IndexStepTwoMapper extends Mapper<Text, IntWritable, Text, Text> {

		@Override
		protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
			String[] split = key.toString().split("-");
			context.write(new Text(split[0]), new Text(split[1].replaceAll("\t", "-->")));	
		}
	}

	public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text> {
		// 一组数据:  <hello,a.txt-->4> <hello,b.txt-->4> <hello,c.txt-->4>
		@Override
		protected void reduce(Text key, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			StringBuilder sb = new StringBuilder();	
			for (Text value : values) {
				sb.append(value.toString()).append("\t");
			}
			context.write(key, new Text(sb.toString()));
		}
	}
	
	public static void main(String[] args) throws Exception{
		
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		job.setJarByClass(IndexStepTwo.class);
		job.setMapperClass(IndexStepTwoMapper.class);
		job.setReducerClass(IndexStepTwoReducer.class);
		job.setNumReduceTasks(1);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setInputFormatClass(SequenceFileInputFormat.class);

		FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\index\\out3"));
		FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\index\\out4"));

		job.waitForCompletion(true);
	}
}

最终结果:

c++	c.txt	
hello	a.txt	b.txt	c.txt	
jack	b.txt	
java	c.txt	
jerry	b.txt	c.txt	
jim	a.txt	b.txt	
kitty	a.txt	b.txt	
rose	a.txt	
tom	a.txt