欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MIT6.824 lab1 MapReduce 2018版本Part1 Map/Reduce input and output

程序员文章站 2022-07-12 17:02:54
...

2020在2018的基础上做了些调整,变得更加开放,但同时也增加了一些难度。建议先做2018的版本,在做2020的版本,这样编码的质量会提高。

 

Preamble: Getting familiar with the source

The mapreduce package provides a simple Map/Reduce library (in the mapreduce directory). Applications should normally call Distributed() [located in master.go] to start a job, but may instead call Sequential() [also in master.go] to get a sequential execution for debugging.

 

Part I: Map/Reduce input and output

The Map/Reduce implementation you are given is missing some pieces. Before you can write your first Map/Reduce function pair, you will need to fix the sequential implementation. In particular, the code we give you is missing two crucial pieces: the function that divides up the output of a map task, and the function that gathers all the inputs for a reduce task. These tasks are carried out by the doMap() function in common_map.go, and the doReduce() function in common_reduce.go respectively. The comments in those files should point you in the right direction.

To help you determine if you have correctly implemented doMap() and doReduce(), we have provided you with a Go test suite that checks the correctness of your implementation. These tests are implemented in the file test_test.go. To run the tests for the sequential implementation that you have now fixed, run:

 

doMap manages one map task: it should read one of the input files

(inFile), call the user-defined map function (mapF) for that file's

contents, and partition mapF's output into nReduce intermediate files.

 

下面是doMap和doReduce的函数实现:

主要难点是文件的IO,以及写json文件,这一部分主要靠查Golang的文档。

func doMap(
	jobName string, // the name of the MapReduce job
	mapTask int, // which map task this is
	inFile string,
	nReduce int, // the number of reduce task that will be run ("R" in the paper)
	mapF func(filename string, contents string) []KeyValue,
) {
	//
	// doMap manages one map task: it should read one of the input files
	// (inFile), call the user-defined map function (mapF) for that file's
	// contents, and partition mapF's output into nReduce intermediate files.
	//
	// There is one intermediate file per reduce task. The file name
	// includes both the map task number and the reduce task number. Use
	// the filename generated by reduceName(jobName, mapTask, r)
	// as the intermediate file for reduce task r. Call ihash() (see
	// below) on each key, mod nReduce, to pick r for a key/value pair.
	//
	// mapF() is the map function provided by the application. The first
	// argument should be the input file name, though the map function
	// typically ignores it. The second argument should be the entire
	// input file contents. mapF() returns a slice containing the
	// key/value pairs for reduce; see common.go for the definition of
	// KeyValue.
	//
	// Look at Go's ioutil and os packages for functions to read
	// and write files.
	//
	// Coming up with a scheme for how to format the key/value pairs on
	// disk can be tricky, especially when taking into account that both
	// keys and values could contain newlines, quotes, and any other
	// character you can think of.
	//
	// One format often used for serializing data to a byte stream that the
	// other end can correctly reconstruct is JSON. You are not required to
	// use JSON, but as the output of the reduce tasks *must* be JSON,
	// familiarizing yourself with it here may prove useful. You can write
	// out a data structure as a JSON string to a file using the commented
	// code below. The corresponding decoding functions can be found in
	// common_reduce.go.
	//
	//   enc := json.NewEncoder(file)
	//   for _, kv := ... {
	//     err := enc.Encode(&kv)
	//
	// Remember to close the file after you have written all the values!
	//
	// Your code here (Part I).
	//

	content, err := ioutil.ReadFile(inFile)
	if err != nil {
		log.Fatalf("error: failed to read file %v when doing map task", inFile)
	}

	keyValues := mapF(inFile,string(content))

	interResults := make([][]KeyValue, nReduce)


	for _, keyValue:=range keyValues{
		iReduce := ihash(keyValue.Key)%nReduce
		interResults[iReduce] = append(interResults[iReduce],keyValue)
	}

	for i:=0;i<nReduce;i++{
		outFile := reduceName(jobName, mapTask, i)
		f, openErr := os.OpenFile(outFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0664)
		if openErr != nil {
			log.Fatal("OpenFile: ", openErr)
		}
		defer f.Close()

		// 将KeyValue切片中的每个元素序列化为JSON并写入到文件
		enc := json.NewEncoder(f)
		for _, kv := range interResults[i] {
			err := enc.Encode(&kv)
			if err != nil {
				log.Fatal("Encode: ", err)
			}
		}

	}

}

 

func doReduce(
	jobName string, // the name of the whole MapReduce job
	reduceTask int, // which reduce task this is
	outFile string, // write the output here
	nMap int, // the number of map tasks that were run ("M" in the paper)
	reduceF func(key string, values []string) string,
) {
	//
	// doReduce manages one reduce task: it should read the intermediate
	// files for the task, sort the intermediate key/value pairs by key,
	// call the user-defined reduce function (reduceF) for each key, and
	// write reduceF's output to disk.
	//
	// You'll need to read one intermediate file from each map task;
	// reduceName(jobName, m, reduceTask) yields the file
	// name from map task m.
	//
	// Your doMap() encoded the key/value pairs in the intermediate
	// files, so you will need to decode them. If you used JSON, you can
	// read and decode by creating a decoder and repeatedly calling
	// .Decode(&kv) on it until it returns an error.
	//
	// You may find the first example in the golang sort package
	// documentation useful.
	//
	// reduceF() is the application's reduce function. You should
	// call it once per distinct key, with a slice of all the values
	// for that key. reduceF() returns the reduced value for that key.
	//
	// You should write the reduce output as JSON encoded KeyValue
	// objects to the file named outFile. We require you to use JSON
	// because that is what the merger than combines the output
	// from all the reduce tasks expects. There is nothing special about
	// JSON -- it is just the marshalling format we chose to use. Your
	// output code will look something like this:
	//
	// enc := json.NewEncoder(file)
	// for key := ... {
	// 	enc.Encode(KeyValue{key, reduceF(...)})
	// }
	// file.Close()
	//
	// Your code here (Part I).
	//

	interMaps := map[string][]string{}
	for i:=0;i<nMap;i++{
		inFile := reduceName(jobName, i, reduceTask)
		file, err := os.Open(inFile)
		if err != nil {
			log.Fatal("Open intermedia file: ", err)
		}

		// 从map的每个输出文件逐个反序列出KeyValue对象,直到遇到EOF出错为止。
		decoder := json.NewDecoder(file)
		for {
			var kv KeyValue // 解码后的结果存在kv中
			if err := decoder.Decode(&kv); err != nil {
				if err == io.EOF {
					break
				} else {
					log.Fatal(err)
				}
			}
			// 如果Key还没有被统计过
			if _, ok := interMaps[kv.Key]; !ok {
				interMaps[kv.Key] = []string{kv.Value}
			} else {
				// 被统计过了
				interMaps[kv.Key] = append(interMaps[kv.Key], kv.Value)
			}
		}
	}

	f, openErr := os.OpenFile(outFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0664)
	if openErr != nil {
		log.Fatal("Open result file: ", openErr)
	}
	enc := json.NewEncoder(f)

	for key, value := range interMaps{
		enc.Encode(KeyValue{key, reduceF(key,value)})
	}

	f.Close()
}

 

测试程序用

go test -run Sequential

 

 

相关标签: 算法