欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MIT 6.824 Lab1 MapReduce

程序员文章站 2022-07-12 17:39:38
...

概述

本文章主要讲述lab1的基本实现思路,具体的实验要求见MIT-Lab1。实验代码见Lab代码

基本需求

  • 一个Coordinator管理多个Worker,通过RPC进行通信
  • Worker向Corrdinator请求任务,Coordinator向Worker分配任务
  • Coordinator能够处理Worker Crash

基本数据结构

Coordinator

type Coordinator struct {
	// Your definitions here.
	nReduce     int
	nMap        int
	workerLists sync.Map
	startReduce chan bool

	// MapTask
	muMapTask       sync.Mutex
	mapTaskNeedExec int
	mapTaskLists    []*MapTask
	mapTaskQueue    chan *MapTask

	// ReduceTask
	muReduceTask       sync.Mutex
	reduceTaskNeedExec int
	reduceTaskLists    []*ReduceTask
	reduceTaskQueue    chan *ReduceTask
}

workerLists用来管理Worker所有Worker的状态,mapTaskQueue和reduceTaskQueue为并发队列,用于Worker并发获取任务,mapTaskLists和reduceTaskLists用于存储所有的Task。

Worker

type worker struct {
	id       string
	nReduce  int
	needExit chan bool
}

needExit同于判断当前Worker是否可以退出,即所有任务已经完成。

具体功能

Worker注册

每个Worker新加入集群时,都要向Coordinator发起注册,Coordinator收到注册请求后,会进行合法性判断,如果合法则加入到workerLists中

// worker.go
func (w *worker) register() {
	w.id = strconv.Itoa(os.Getpid())
	reply := RegisterReply{}
	args := RegisterArgs{WorkerID: w.id}
	call("Coordinator.Register", &args, &reply)
	w.nReduce = reply.ReduceNum
}

// coordinator.go
func (c *Coordinator) Register(args *RegisterArgs, reply *RegisterReply) error {
	workerID := args.WorkerID
	_, exist := c.workerLists.Load(workerID)

	if exist {
		return errors.New(ErrDuplicateWorker)
	}
	reply.ReduceNum = c.nReduce
	worker := workerRecord{currTaskId: -1}
	c.workerLists.Store(workerID, &worker)
	return nil
}

任务请求与执行

Worker会启动一个线程不断循环向Coordinator获取任务,调用相应的Map的Reduce函数,并在任务执行完成后通知Coordinator。如果Coordinator通知Worker所有任务都已经结束,则Worker进程可以直接退出。

// worker.go
func (w *worker) requestAndReportTask(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
	for {
		reply := RequestTaskReply{}
		call("Coordinator.DispatchTask", &RequestTaskArgs{w.id}, &reply)

		taskType := reply.TaskType
		taskId := reply.TaskId

		reportArgs := CompleteReportArgs{TaskId: taskId, WorkerID: w.id, TaskType: taskType}
		reportReply := CompleteReportReply{}

		switch taskType {
		case Map:
			res := mapf(reply.Content.(KeyValue).Key, reply.Content.(KeyValue).Value)
			reportArgs.TempFiles = tempFiles
			call("Coordinator.CompleteReport", &reportArgs, &reportReply)
		case Reduce:
			kv := KeyValueSlice(reply.Content.([]KeyValue))
			sort.Sort(kv)
			result := []KeyValue{}
			for i := 0; i < len(kv); {
				j := i + 1
				for j < len(kv) && kv[j].Key == kv[i].Key {
					j++
				}
				values := []string{}
				for k := i; k < j; k++ {
					values = append(values, kv[k].Value)
				}
				output := reducef(kv[i].Key, values)
				result = append(result, KeyValue{Key: kv[i].Key, Value: output})
				i = j
			}
			
			if reportReply.TaskFinished {
				w.needExit <- true
				os.Exit(1)
			}

		case NoTask:
			time.Sleep(time.Second)
		}
	}
}

Coordinator在分发任务时,会从并发队列中获取要执行的Task,并且发送给请求的Worker。

// coordinator.go
func (c *Coordinator) DispatchTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
	workerId := args.WorkerID
	record, exist := c.workerLists.Load(workerId)

	if !exist {
		return errors.New(ErrNoWorker)
	}

	record.(*workerRecord).muTime.Lock()
	record.(*workerRecord).taskStartTime = time.Now()
	record.(*workerRecord).muTime.Unlock()

	select {
	case mTask := <-c.mapTaskQueue:
		reply.Content = KeyValue{Key: mTask.fileName, Value: mTask.content}
		reply.TaskType = Map
		reply.TaskId = mTask.id
		record.(*workerRecord).muTask.Lock()
		//fmt.Printf("worker %v get task: %d\n", workerId, mTask.id)
		record.(*workerRecord).currTaskId = mTask.id
		record.(*workerRecord).muTask.Unlock()
	case rTask := <-c.reduceTaskQueue:
		reply.Content = rTask.content
		reply.TaskType = Reduce
		reply.TaskId = rTask.id
		record.(*workerRecord).muTask.Lock()
		record.(*workerRecord).currTaskId = rTask.id
		record.(*workerRecord).muTask.Unlock()
	default:
		reply.TaskType = NoTask
	}

	return nil
}

宕机处理

在Worker的任务10s内没有执行完成时,我们判断该Worker宕机,将其正在执行的任务分配到其他空闲的Worker上。Coordinator端会开启一个线程每秒查询所有Worker的任务执行状态,如果该Worker宕机,则将其执行的任务重新放入缓冲队列中。

// coordinator.go
func (c *Coordinator) checkAlive() {
	for {
		t := time.NewTimer(time.Second)
		<-t.C
		c.workerLists.Range(func(k, v interface{}) bool {
			//fmt.Printf("check worker:%v\n", k)
			v.(*workerRecord).muTask.Lock()
			defer v.(*workerRecord).muTask.Unlock()

			if v.(*workerRecord).currTaskId == -1 {
				//fmt.Printf("worker:%v no task\n", k)
				return true
			}

			t := time.Now()
			v.(*workerRecord).muTime.Lock()
			duration := t.Sub(v.(*workerRecord).taskStartTime)
			v.(*workerRecord).muTime.Unlock()

			if duration > (time.Second * AliceCheckDuration) {
				c.workerLists.Delete(k)
				//fmt.Printf("worker %v offline, crash occured\n", k)

				c.muMapTask.Lock()
				if c.mapTaskNeedExec != 0 {
					c.mapTaskQueue <- c.mapTaskLists[v.(*workerRecord).currTaskId]
				}
				c.muMapTask.Unlock()

				c.muReduceTask.Lock()
				if c.reduceTaskNeedExec != 0 {
					c.reduceTaskQueue <- c.reduceTaskLists[v.(*workerRecord).currTaskId]
				}
				c.muReduceTask.Unlock()
			}
			return true
		})
	}
}

在处理宕机时,有个问题就是可能出现一个任务重复执行的情况。比如Worker a执行Task a时由于CPU调度等因素超过了10s,此时我们将Task a重新分配给Worker b继续执行,这时Worker a上的Task a还可能会执行。为了解决这个问题,我们采用了MapReduce中的基于临时文件的解决方案,每个Worker把结果写入临时文件,由Coordinator将其重命名,由于重命名操作是原子操作,因此不会发生写冲突。

总结

本文只展示了部分重要代码,完整代码和数据结构见 MIT-6.24 lab 代码