欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

6.824 lab1

程序员文章站 2022-07-12 17:03:42
...

 

一、实验说明

https://pdos.csail.mit.edu/6.824/labs/lab-1.html

二、梳理

在 lab1 中需要编写填充MapReduce 部分代码,使用 Go 建立一个容错的分布式系统。

mapreduce 包提供了一个简单的 Map/Reduce 库,调用 master.go/Distributed()来开始任务

mapreduce在运行大致为5部分 
1. input:获取输入数据进行分片作为map的输入
2. map:将输入记录解析成一条或多条记录
3. shuffle:对中间数据的处理,作为reduce的输入 
4. reduce:对相同key的数据进行合并 
5. output:按照格式输出到指定目录

代码主要流程如下:

  1. Master 读取输入文件,然后调用 common_map.go中的doMap()。 doMap()运行 mapF (),将文件转化为 key/value (KeyValue)结果写入 nReduce个中间文件之中。在 map 结束后总共会生成 nMap * nReduce个文件。
  2. 之后调用 common_reduce.go/doReduce()doReduce()将按照 reduce task 编号(reduceTask)来汇总,生成 nReduce个结果文件。

Part I: Map/Reduce input and output

描述

doMap() 

  • 读取数据
  • 调用mapF 把文件进行解析成KeyValue
  • 生成 nReduce 个子文件
  • 利用 KeyValue中的 Key 值做哈希,将得到的值对 nReduce 取余,根据结果将KeyValue存入对应文件之中
func doMap(
	jobName string, // the name of the MapReduce job
	mapTask int,    // which map task this is
	inFile string,
	nReduce int, // the number of reduce task that will be run ("R" in the paper)
	mapF func(filename string, contents string) []KeyValue,
) {
	fileContent, err := ioutil.ReadFile(inFile)

	if err != nil {
		log.Fatal(err)
	}

	// use JSON, but as the output of the reduce tasks *must* be JSON
	encoders := make([]*json.Encoder, nReduce)

	// nReduce the number of reduce task that will be run
	for i := 0; i < nReduce; i++ {
		var err error

		filename := reduceName(jobName, mapTask, i)

		outFile, err := os.Create(filename)

		if err != nil {
			log.Fatal(err)
		}

		defer outFile.Close()

		encoders[i] = json.NewEncoder(outFile)
	}

	mapRes := mapF(inFile, string(fileContent))

	for _, kv := range mapRes {
		index := int(ihash(kv.Key)) % nReduce
		encoders[index].Encode(&kv)
	}

}

doReduce()

  • 读取同一个reduce task下所有文件
  • 解析文件中的KeyValue
  • 对KeyValue进行排序
  • 生成一个文件,并根据排序将KeyValue写入其中
func doReduce(
	jobName string, // the name of the whole MapReduce job
	reduceTask int, // which reduce task this is
	outFile string, // write the output here
	nMap int,       // the number of map tasks that were run ("M" in the paper)
	reduceF func(key string, values []string) string,
) {
	keyValues := make(map[string][]string)

	for i := 0; i < nMap; i++ {
		// reduceName(jobName, m, reduceTask) yields the file
		filename := reduceName(jobName, i, reduceTask)

		file, err := os.Open(filename)

		if err != nil {
			log.Fatal(err)
		}

		enc := json.NewDecoder(file)

		for {
			var kv KeyValue

			err := enc.Decode(&kv)

			if err != nil {
				break
			}

			_, ok := keyValues[kv.Key]

			if !ok {
				keyValues[kv.Key] = make([]string, 0)
			}

			keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
		}
	}

	var keys []string

	for k := range keyValues {
		keys = append(keys, k)
	}

	// doReduce manages one reduce task: it should read the intermediate
	// files for the task, sort the intermediate key/value pairs by key,
	sort.Strings(keys)

	mergeFile := mergeName(jobName, reduceTask)

	file, err := os.Create(mergeFile)

	if err != nil {
		fmt.Printf("create reduce merge file:%s error.", mergeFile)
		return
	}

	defer file.Close()

	enc := json.NewEncoder(file)

	for _, k := range keys {
		enc.Encode(KeyValue{k, reduceF(k, keyValues[k])})
	}

}

Part II: Single-worker word count

描述

实现字数统计—一个简单的Map / Reduce示例。通过修改main / wc.go中的mapF()和reduceF()函数,以便 wc.go报告输入中每个单词的出现次数。单词是任何连续的字母序列,由unicode.IsLetter确定

mapF

  • 读取文件,根据unicode.IsLetter判断是否为字母,进行split
  • 将split后的数据组装成KeyValue

reduceF

  • 返回values的length(即为key的出现次数)
func mapF(filename string, contents string) []mapreduce.KeyValue {
	// Your code here (Part II).
	var res []mapreduce.KeyValue

	words := strings.FieldsFunc(contents, func(x rune) bool {
		return unicode.IsLetter(x) == false
	})

	for _, w := range words {
		kv := mapreduce.KeyValue{w, "1"}
		res = append(res, kv)
	}

	return res
}
func reduceF(key string, values []string) string {
	// Your code here (Part II).
	return strconv.Itoa(len(values))
}

Part III: Distributing MapReduce tasks

描述

目前为止我们都是串行地执行任务,而Map / Reduce最大的优势是可以自动并行化执行普通代码,而无需开发人员进行任何额外的工作。在本部分的实验中,您将完成一个MapReduce版本,该版本将工作划分为一组在多个内核上并行运行的工作线程。虽然不像实际的Map / Reduce部署那样分布在多台计算机上,但是您的实现将使用RPC来模拟分布式计算。

mapreduce / master.go中 的代码完成了管理MapReduce作业的大部分工作。我们还与辅助线程的完整代码,在为您提供的MapReduce / worker.go,以及一些代码来处理RPC中的MapReduce / common_rpc.go。

您的工作是 在mapreduce / schedule.go中实现schedule()。Master在MapReduce作业期间两次调用schedule(),一次在Map阶段,一次在Reduce阶段。schedule()的工作是将任务分发给可用的worker。通常会有比worker thread更多的任务,因此schedule()必须给每个worker thread一个Task序列。 schedule()应等待所有任务完成,然后返回。

schedule()通过读取registerChan参数来Workers信息,得到一个包含 Worker 的 RPC 地址的 string。有些Worker可能在调用schedule()之前就已经存在,而有些Worker可能在schedule()运行时启动;全部将出现在 registerChan上。schedule()应该使用所有Worker,包括启动后出现的Worker

schedule()通过将Worker.DoTask RPC 发送给Worker来通知Worker执行任务。该RPC的参数由DoTaskArgs在mapreduce / common_rpc.go中定义。schedule()可以在mapFiles中找到这些文件名。

使用mapreduce / common_rpc.go中的call()函数 将RPC发送给工作程序。第一个参数是工作程序的地址,从registerChan读取。第二个参数应为“ Worker.DoTask”。第三个参数应为DoTaskArgs结构,最后一个参数应为nil。

schedule

  • sync.WaitGroup确保所有task执行完成才结束
  • 通过读取registerChan参数来Workers信息,得到一个包含 Worker 的 RPC 地址的 string
  • 将Worker.DoTask RPC 发送给Worker来通知Worker执行任务

需要注意的问题

chan死锁

registerChan为无缓冲信道ch := make(chan string)

无缓冲信道本身不存储信息,它只负责转手,有人传给它,它就必须要传给别人,如果只有进或者只有出的操作,都会造成阻塞

registerChan <- workAddr后,这一句会阻塞等待其它goroutine从out读取,如果没有就会持续堵塞goroutine
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
	var ntasks int
	var n_other int // number of inputs (for reduce) or outputs (for map)
	switch phase {
	case mapPhase:
		ntasks = len(mapFiles)
		n_other = nReduce
	case reducePhase:
		ntasks = nReduce
		n_other = len(mapFiles)
	}

	fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

	var wGroup sync.WaitGroup
	wGroup.Add(ntasks)

	for i := 0; i < ntasks; i++ {
		go func(i int) {
			defer wGroup.Done()

			filename := ""

			if i <= len(mapFiles) {
				filename = mapFiles[i]
			}

			taskArgs := DoTaskArgs{
				JobName:       jobName,
				File:          filename,
				Phase:         phase,
				TaskNumber:    i,
				NumOtherPhase: n_other,
			}

			taskFinished := false

			for taskFinished == false {
				workAddr := <-registerChan

				taskFinished = call(workAddr, "Worker.DoTask", taskArgs, nil)

				go func() {
					registerChan <- workAddr
				}()
			}

		}(i)
	}

	wGroup.Wait()

	fmt.Printf("Schedule: %v done\n", phase)
}

Part IV: Handling worker failures

描述

在这一部分中,您将使Master处理failed Worker。MapReduce使此操作相对容易,因为Worker没有持久状态。如果Worker在处理来自Master的RPC时失败,则由于超时,主服务器的call()最终将返回false。在这种情况下,Master 需要把任务交给另一个 Worker。

RPC失败并不一定意味着该Worker未执行任务;Worker可能已经执行了它,但是Reply丢失了,或者Worker仍在执行但Master的RPC超时了。因此,可能会发生两个Worker接收同一任务,对其进行计算并生成输出的情况。对于给定的输入,需要两次调用map或reduce函数生成相同的输出,因此,如果后续处理有时读取一个输出,有时又读取另一个输出,则不会出现不一致的情况。此外,MapReduce框架可确保map和reduce函数的输出原子显示:输出文件将不存在,或者包含map或reduce函数的一次执行的全部输出

taskFinished := false

			for taskFinished == false {
				workAddr := <-registerChan

				taskFinished = call(workAddr, "Worker.DoTask", taskArgs, nil)

				go func() {
					registerChan <- workAddr
				}()
			}

Part V: Inverted index generation

描述

倒排索引在计算机科学中被广泛使用,并且在文档搜索中特别有用。广义而言,倒排索引是从有关基础数据的有趣事实到该数据的原始位置的映射。例如,在搜索的上下文中,它可能是从关键字到包含这些单词的文档的映射。

我们在main / ii.go 中创建了第二个二进制文件,该文件与您之前构建的wc.go非常相似。您应该在main / ii.go中修改mapF和 reduceF,以便它们一起产生一个反向索引。

  • 通过map去重
  • sort.Strings对values排序,然后按照格式次数 排序后的文件名返回结果
func mapF(document string, value string) (res []mapreduce.KeyValue) {
	// Your code here (Part V).
	words := strings.FieldsFunc(value, func(x rune) bool {
		return unicode.IsLetter(x) == false
	})

	kvMap := make(map[string]string)

	for _, w := range words {
		kvMap[w] = document
	}

	for k, v := range kvMap {
		res = append(res, mapreduce.KeyValue{k, v})
	}

	return res
}
func reduceF(key string, values []string) string {
	// Your code here (Part V).
	vLen := len(values)

	sort.Strings(values)

	res := strconv.Itoa(vLen) + " " + strings.Join(values, ",")

	return res
}