欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《Distributed Systems》(6.824)LAB1(mapreduce)

程序员文章站 2022-07-12 17:39:32
...

开始之前

6.824是mit开的一门分布式系统的课程。LAB使用GO语言完成。详细信息查看课程首页。
课程视频B站
课程首页
go文档需要*访问
我在本地安装虚拟机使用的环境是ubuntu20.04,使用vscode进行远程开发debug。

lab

完成该lab最重要的参考资料是论文MapReduce (2004)Assigned: Lab 1: MapReduce,上面两个务必非常仔细的阅读,特别是任务说明。否则细节的处理可能就和要求不一样导致出现问题。

该lab我花了2天半完成,另外我觉得这个lab还是自己完成比较好,能够真正的知道mapreduce的细节,自己做才能真的熟悉go语言。建议还是自己尝试尝试再看别人的博客。

  • 在实验开始前需要仔细弄明白实验给出的单进程单线程mapreducemrsequential.go文件,后面的实现中部分代码可以参考该文件。同时对于该文件不要误入歧途,比如我做的时候就因为纠结map的输出文件为什么是mr-x-y而不是mr-y以及reduce的输出文件为什么是mr-out-y而不是mr-out-0。这在实验要求文档都说明的很清楚。
  • 可能是我的虚拟机比较辣鸡,测试的reduce paraller样例始终过不去,翻了test-mr.sh才发现是由于服务器启动之后睡眠一秒启动客户端,这时候可能服务器并没有来得及启动客户端就rpc了,造成卡死,因此可以在sh中增加启动服务端以后的睡眠时间或者在mrwork.go中睡眠1秒再调用worker()
  • 对于实验指导文档中提到的防止worker崩溃或者卡的时间太长(通过阅读测试样例sh调用的map reduce函数发现= =),使用ioutil.TempFile创建临时文件和os.Rename对临时文件改名。我在完成之后考虑到如果出现两个worker同时os.Rename到同一个名字会出现一个被一个完全覆盖还是中断还是其他情况这一点,我考虑可以使用的方法是用每个机器一个machineId来请求master得到许可再os.Rename处理。可能是由于这种情况碰到的概率太小,又或者是完全覆盖,不会出现BUG,同时我看到这位老哥博客的讨论区6.824(2020春) Lab1:MapReduce说到真正的分布式系统不会遇到该情况,因此我后续的代码中尽管定义出了结构体中的machineID但是暂时没有处理该情况。

设计

该实验需要完成下列五个文件,对于实验指导文档里的东西就不多提了,下面介绍一下主要思路(懒)
《Distributed Systems》(6.824)LAB1(mapreduce)
参考原始master.gowork.go的代码,考虑使用master来作为服务器,worker作为客户端,通过work发送rpc来调用master的请求任务函数,任务完成再进行rpc来调用完成任务函数。再master上维护一下结构体如下

type Master struct {
	// Your definitions here.
	State int	//0:map 1:reduce 3:完成 
	NMap int	//最大并行map的个数,哈希的个数
	NReduce int	//最大并行reduce的个数,哈希的个数
	MapTask map[int]*Task	//map任务
	ReduceTask map[int]*Task	//reduce任务
	//MachineNum弃用,因为使用到os.Rename覆盖文件,我写到后面意识到是单机,文件可以直接覆盖不用考虑不同机器产生的输出文件会多余
	MachineNum int	//用来给机器分配machine id 从1开始分配 
	Mu sync.Mutex	//只能有一个worker访问
}

type Task struct {
	Filename string	//应该打开的文件名,如果是reduce任务,此项不必须
	//MachineId弃用,因为使用到os.Rename覆盖文件,我写到后面意识到是单机,文件可以直接覆盖不用考虑不同机器产生的输出文件会多余
	MachineId int 	//执行该任务的机器id
	State int	// 0 表示未做,1表示正在做,2表示完成
	Runtime int	//已经运行的时间,每次始终滴答,该记录+1
}

mrmaster每一秒产生一个始终滴答,调用master的TimeTick(),这个函数根据m.State判断当前是map阶段还是reduce阶段遍历m.MapTask和m.ReduceTask对正在做的任务的Runtime加1,如果超过10,则该task置为未完成状态(可以分配该任务)。唯一用到的锁是worker对master的rpc函数开始和结束位置上个互斥锁保护代码块。

代码

rpc.go

rpc.go

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"


// Add your RPC definitions here.

type AskArgs struct {
	MachineId int
}

type AskReply struct {
	State int	//0 map 1 reduce 2无任务等待0-1之间的过程
	Filename string	//任务文件名,如果是reduce任务,不需要文件名,因为要对所有key哈希相同的文件做合并
	TaskNumber int	//任务号
	NMap int
	NReduce int
	MachineId int
}

type FinishArgs struct {
	State int	//0: finish map, 1: finish reduce
	TaskNumber int
}

type FinishReply struct {
	State int	//0: 未完成, 1: 完成mapreduce
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

mrmatser.go和master.go

mrmaster.go

package main

//
// start the master process, which is implemented
// in ../mr/master.go
//
// go run mrmaster.go pg*.txt
//
// Please do not change this file.
//

import "../mr"
import "time"
import "os"
import "fmt"

func main() {
	if len(os.Args) < 2 {
		fmt.Fprintf(os.Stderr, "Usage: mrmaster inputfiles...\n")
		os.Exit(1)
	}
	//a := os.Args[1:]
	//传递参数所有文件名给 (files, nReduce) *Master
	m := mr.MakeMaster(os.Args[1:], 10)
	//循环检验matser判断是否完成所有任务
	for m.Done() == false {
		time.Sleep(time.Second)
		m.TimeTick()	//每秒给一个时钟滴答
	}
	time.Sleep(time.Second)
}

master.go

package mr

import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"
import "sync"
//import "strconv"
// import "fmt"


type Master struct {
	// Your definitions here.
	State int	//0:map 1:reduce 3:完成 
	NMap int	//最大并行map的个数,哈希的个数
	NReduce int	//最大并行reduce的个数,哈希的个数
	MapTask map[int]*Task	//map任务
	ReduceTask map[int]*Task	//reduce任务
	//MachineNum弃用,因为使用到os.Rename覆盖文件,我写到后面意识到是单机,文件可以直接覆盖不用考虑不同机器产生的输出文件会多余
	MachineNum int	//用来给机器分配machine id 从1开始分配 
	Mu sync.Mutex	//只能有一个worker访问
}

type Task struct {
	Filename string	//应该打开的文件名,如果是reduce任务,此项不必须
	//MachineId弃用,因为使用到os.Rename覆盖文件,我写到后面意识到是单机,文件可以直接覆盖不用考虑不同机器产生的输出文件会多余
	MachineId int 	//执行该任务的机器id
	State int	// 0 表示未做,1表示正在做,2表示完成
	Runtime int	//已经运行的时间,每次始终滴答,该记录+1
}

//更新所有task的Runtime+1,如果到达10则认为该机器挂了,需要一个新的机器去完成
func (m *Master)TimeTick() {
	m.Mu.Lock()
	if (m.State == 0) {
		for taskNumber, task := range(m.MapTask) {
			if (task.State == 1) {
				m.MapTask[taskNumber].Runtime = m.MapTask[taskNumber].Runtime + 1
				if (m.MapTask[taskNumber].Runtime >= 10) {	//超过10个始终滴答,默认认为该任务的主机已经挂了
					m.MapTask[taskNumber].State = 0
				}
			}
		}
	} else if (m.State == 1) {
		for taskNumber, task := range(m.ReduceTask) {	//遍历taskNumber
			if (task.State == 1) {
				m.ReduceTask[taskNumber].Runtime = m.ReduceTask[taskNumber].Runtime + 1
				if (m.ReduceTask[taskNumber].Runtime >= 10) {	
					m.ReduceTask[taskNumber].State = 0
				}
			}
		}
	}
	m.Mu.Unlock()
}

func (m *Master)UpdateMasterState() {
	for _, task := range(m.MapTask) {
		//fmt.Println("MapTask.State: ", task.State)
		if (task.State == 0 || task.State == 1) {
			m.State = 0	//处于map阶段
			return
		}
	}
	for _, task := range(m.ReduceTask) {
		if (task.State == 0 || task.State == 1) {
			m.State = 1	//处于reduce阶段
			return
		}
	}
	m.State = 2
}


// Your code here -- RPC handlers for the worker to call.
// 请求任务,返回的state,0:map 1:reduce 2:空转 3:完成
// reply.Filename=""则左右任务被接受了但是还没完全完成
func (m *Master) AskTask(args *AskArgs, reply *AskReply) error {
	m.Mu.Lock()
	// fmt.Println("m.State", m.State)
	reply.State = 2
	reply.NMap = m.NMap
	reply.NReduce = m.NReduce
	if (args.MachineId == 0) {	//分配机器号
		m.MachineNum = m.MachineNum + 1
		reply.MachineId = m.MachineNum
	} else {
		reply.MachineId = args.MachineId
	}
	if (m.State == 0) {	//map
		for taskNumber, task := range(m.MapTask) {
			if (task.State == 0) {
				reply.State = 0
				reply.Filename = task.Filename
				reply.TaskNumber = taskNumber
				m.MapTask[taskNumber].State = 1	//正在做
				break
			}
		}
	} else if (m.State == 1) {	//reduce
		// fmt.Println("m.ReduceTask: ", m.ReduceTask)
		for taskNumber, task := range(m.ReduceTask) {
			// fmt.Println("taskNumber, task =", taskNumber, task)
			if (task.State == 0) {
				reply.State = 1
				reply.TaskNumber = taskNumber
				m.ReduceTask[taskNumber].State = 1	//正在做
				break
			}
		}
	} else {
	}
	m.Mu.Unlock()
	return nil;
}

/*
* master收到FinishTask请求时更新当前master的状态
* 当master的状态是2,mapreduce全部完成时,reply的State置为0终止worker
*/
func (m *Master) FinishTask(args FinishArgs, reply *FinishReply) error {
	m.Mu.Lock()
	//println("master called FinishTask")
	reply.State = 0
	if (args.State == 0) {
		m.MapTask[args.TaskNumber].State = 2	//完成
		m.UpdateMasterState()
		// fmt.Print(m.State)
	} else if (args.State == 1) {
		m.ReduceTask[args.TaskNumber].State = 2
		m.UpdateMasterState()
		if (m.State == 2) {	//所有任务都已经完成
			reply.State = 1
		}
	}
	m.Mu.Unlock()
	return nil;
}

//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
	//注册rpc使用http服务
	rpc.Register(m)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := masterSock()

	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
	//ret := false
	// Your code here.
	var ret bool;
	if (m.State == 2) {
		ret = true
	} else {
		ret = false
	}
	return ret
}

//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
	mapTask := make(map[int]*Task)
	reduceTask := make(map[int]*Task)
	for i, filename := range(files) {
		mapTask[i] = &Task{Filename: filename, MachineId: 0, State: 0, Runtime: 0}
	}
	for j := 0; j < nReduce; j++ {
		//mr-x-y  范围: x[0,7] y[0, 9], file是统一格式,因此不存它,work自己去拼
		//filename := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(j)
		reduceTask[j] = &Task{Filename: "", MachineId: 0, State: 0, Runtime: 0}
	}
	m := Master{State: 0, NMap: len(files), NReduce: nReduce, MapTask: mapTask, ReduceTask: reduceTask, MachineNum: 0, Mu: sync.Mutex{}}
	m.server()
	return &m
}

mrworker.go和worker.go

mrworker.go

package main

//
// start a worker process, which is implemented
// in ../mr/worker.go. typically there will be
// multiple worker processes, talking to one master.
//
// go run mrworker.go wc.so
//
// Please do not change this file.
//

import "../mr"
import "plugin"
import "os"
import "fmt"
import "log"
import "time"
// import "unicode"
// import "strings"
// import "strconv"

func main() {
	if len(os.Args) != 2 {
		fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so\n")
		os.Exit(1)
	}

	mapf, reducef := loadPlugin(os.Args[1])
	time.Sleep(3 * time.Second)
	mr.Worker(mapf, reducef)
}

//
// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
//
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
	p, err := plugin.Open(filename)
	if err != nil {
		log.Fatalf("cannot load plugin %v", filename)
	}
	xmapf, err := p.Lookup("Map")
	if err != nil {
		log.Fatalf("cannot find Map in %v", filename)
	}
	mapf := xmapf.(func(string, string) []mr.KeyValue)
	xreducef, err := p.Lookup("Reduce")
	if err != nil {
		log.Fatalf("cannot find Reduce in %v", filename)
	}
	reducef := xreducef.(func(string, []string) string)

	return mapf, reducef
}


// func mapf(filename string, contents string) []mr.KeyValue {
// 	// function to detect word separators.
// 	ff := func(r rune) bool { return !unicode.IsLetter(r) }

// 	// split contents into an array of words.
// 	words := strings.FieldsFunc(contents, ff)

// 	kva := []mr.KeyValue{}
// 	for _, w := range words {
// 		kv := mr.KeyValue{w, "1"}
// 		kva = append(kva, kv)
// 	}
// 	return kva
// }

// //
// // The reduce function is called once for each key generated by the
// // map tasks, with a list of all the values created for that key by
// // any map task.
// //
// func reducef(key string, values []string) string {
// 	// return the number of occurrences of this word.
// 	return strconv.Itoa(len(values))
// }

worker.go

package mr

import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
import "os"
import "io/ioutil"
import "strconv"
import "encoding/json"
//import "time"
import "sort"
//
// Map functions return a slice of KeyValue.
//
// 
type KeyValue struct {
	Key   string
	Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
// 将reduce任务分成10份,方便master进行管理?
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}


//
// main/mrworker.go calls this function.
//
/**
 *	1.rpc请求任务,判断是map还是reduce
 *	2.如果是map,请求key string, val string,(在本机读取k-v)执行map,执行完成将任务存储到磁盘上,通知master。
 *	3.如果是reduce,请求key,在磁盘上读取reduce任务并且执行,执行完成后通知master。
*/
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
	// Your worker implementation here.
	// uncomment to send the Example RPC to the master.
	//CallExample()
	machineId := 0
	for {
		//睡眠一秒,再接下一个任务
		// time.Sleep(time.Second)
		args := AskArgs{MachineId: machineId}
		reply := AskReply{}
		CallAskTask(&args, &reply)
		machineId = reply.MachineId
		taskNumber := reply.TaskNumber
		//根据reply.State绝对做map还是reduce还是循环等待还是退出(全部完成)
		if (reply.State == 0) {//map
			//fmt.Println("正在做map ", reply.Filename)
			file, err := os.Open(reply.Filename)
			if err != nil {
				log.Fatalf("cannot open mapTask %v", reply.Filename)
			}
			content, err := ioutil.ReadAll(file)
			if err != nil {
				log.Fatalf("cannot read %v", reply.Filename)
			}
			file.Close()
			kva := mapf(reply.Filename, string(content))
			//写进mr-taskNumber-y文件中
			WriteMapOutput(kva, taskNumber,reply.NReduce)
		} else if (reply.State == 1) {//reduce
			//fmt.Println("正在做reduce ", reply.Filename)
			//读取json格式的mapoutput文件
			intermediate := []KeyValue{}
			nmap := reply.NMap
			for i := 0; i < nmap; i++ {
				mapOutFilename := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(taskNumber)
				inputFile, err := os.OpenFile(mapOutFilename, os.O_RDONLY, 0777)
				if err != nil {
					log.Fatalf("cannot open reduceTask %v", reply.Filename)
				}
				dec := json.NewDecoder(inputFile)
				for {
					var kv []KeyValue
					if err := dec.Decode(&kv); err != nil {
						break
					}
					intermediate = append(intermediate, kv...)
				}
			}
			//排序写入reduceUutput文件,参考提供的mrsequential代码
			sort.Sort(ByKey(intermediate))
			oFilename := "mr-out-" + strconv.Itoa(taskNumber)
			tmpReduceOutFile, err := ioutil.TempFile("", "mr-reduce-*")
			//reduceOutputFile, err := os.OpenFile(oFilename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777)
			if err != nil {
				log.Fatalf("cannot open %v", reply.Filename)
			}
			//reduceOutputFile, _ := os.OpenFile("mr-out-0", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777)
			i := 0
			for i < len(intermediate) {
				j := i + 1
				for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
					j++
				}
				values := []string{}
				//合并value进字符串数组
				for k := i; k < j; k++ {
					values = append(values, intermediate[k].Value)
				}
				output := reducef(intermediate[i].Key, values)
				// this is the correct format for each line of Reduce output.
				fmt.Fprintf(tmpReduceOutFile, "%v %v\n", intermediate[i].Key, output)
				i = j
			}
			tmpReduceOutFile.Close()
			os.Rename(tmpReduceOutFile.Name(), oFilename)
		} else if (reply.State == 2) {	//空转
			continue
		} else if (reply.State == 3) {	//完成
			break
		}
		finishargs := FinishArgs{State: reply.State, TaskNumber: taskNumber}
		finishreply := FinishReply{}
		CallFinishTask(&finishargs, &finishreply)
		if (finishreply.State == 1) {
			break
		}
	}
}

/**
*	把KeyValue数组s中的每个KeyValue按照Key哈希以后取余nReduce决定存入的文件
* 	创建一个buf,防止一次写一个频繁的开关文件
**/
func WriteMapOutput(kva []KeyValue, taskNumber int, nReduce int) bool{
	buf := make([][]KeyValue, nReduce)
	for _, key_val := range(kva) {
		no := (ihash(key_val.Key)) % nReduce
		buf[no] = append(buf[no], key_val)
	}
	for no, key_val_nums := range(buf) { //json格式写入map的输出文件
		mapOutFilename := "mr-" + strconv.Itoa(taskNumber) + "-" + strconv.Itoa(no)
		tmpMapOutFile, error := ioutil.TempFile("", "mr-map-*")
		if error != nil {
			log.Fatalf("cannot open tmpMapOutFile")
		}
		//outFile, _ := os.OpenFile(mapOutFilename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777)
		enc := json.NewEncoder(tmpMapOutFile)
		err := enc.Encode(key_val_nums)
		if err != nil {
			//fmt.Printf("write wrong!\n")
			return false
		}
		tmpMapOutFile.Close()
		os.Rename(tmpMapOutFile.Name(), mapOutFilename)
	}
	return true
}


func CallAskTask(args *AskArgs, reply *AskReply) {
	call("Master.AskTask", &args, &reply)
	//fmt.Println(reply)
}

func CallFinishTask(args *FinishArgs, reply *FinishReply) {
	call("Master.FinishTask", &args, &reply)
	//fmt.Println(reply)
}

//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()
	//远程调用Master.Example(args, reply)
	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}
	fmt.Println(err)
	return false
}

总结

完成该LAB收获满满。

相关标签: 学习 LAB