From 073d8283acfbbafe693b0f5296150320c852f5ec Mon Sep 17 00:00:00 2001 From: zzy <2450266535@qq.com> Date: Mon, 14 Apr 2025 19:13:46 +0800 Subject: [PATCH] tests 10 times ok --- src/mr/coordinator.go | 297 ++++++++++++++++++++++++++++++++---------- src/mr/rpc.go | 60 ++++++--- src/mr/worker.go | 112 ++++++++++++---- 3 files changed, 355 insertions(+), 114 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index 8998958..6b70087 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,95 +1,239 @@ package mr import ( + "errors" "fmt" "log" "net" "net/http" "net/rpc" "os" + "sync" + "time" ) +const machineDefaultLastSecond = 3 + type Coordinator struct { // Your definitions here. mapTask []MapTask - recudeTask []ReduceTask + mapSize int + reduceTask []ReduceTask + reduceSize int - mapTaskQueue chan int - reduceTaskQueue chan int - running chan bool + taskQueue chan Task - lock chan bool + lock sync.Mutex + machine map[int]*Machine + machineId int + + running chan bool +} + +func (c *Coordinator) popTask(machine *Machine) { + if machine.Task.TaskType == TASK_NONE { + return + } + DPrintf("machine %d pop task %d, task list is %v %v\n", machine.Id, + machine.Task.TaskId, machine.Task.MapTask, machine.Task.ReduceTask) + c.taskQueue <- machine.Task + machine.Task.TaskType = TASK_NONE + machine.State = MACHINE_IDLE +} + +func (c *Coordinator) getMachine(id int) *Machine { + machine, ok := c.machine[id] + if !ok { + DPrintf("machine not found at %d\n", id) + return nil + } + return machine } // Your code here -- RPC handlers for the worker to call. -func (c *Coordinator) GetJob(args *Empty, reply *Task) error { - select { - case id := <-c.mapTaskQueue: - reply.MapTask = &c.mapTask[id] - reply.MapTask.TaskId = id - reply.MapTask.State = PROGRESS +func (c *Coordinator) GetTask(args *Machine, reply *Machine) error { + c.lock.Lock() + defer c.lock.Unlock() - reply.TaskType = TASK_MAP - case id := <-c.reduceTaskQueue: - reply.ReduceTask = &c.recudeTask[id] - reply.ReduceTask.TaskId = id - reply.ReduceTask.State = PROGRESS - - entries, err := os.ReadDir(".") - if err != nil { - log.Fatal("read dir error") - } - var mapId, reduceId int - for _, entry := range entries { - - n, _ := fmt.Sscanf(entry.Name(), "mr-%d-%d", &mapId, &reduceId) - if n == 2 && reduceId == reply.ReduceTask.TaskId { - // log.Printf("reduce task %d find file %s", reply.ReduceTask.TaskId, entry.Name()) - reply.ReduceTask.Fnames = append(reply.ReduceTask.Fnames, entry.Name()) - } - } - - reply.TaskType = TASK_REDUCE + if c.Done() { + reply.State = MACHINE_DONE + return nil } + + machine := c.getMachine(args.Id) + if machine == nil { + reply.State = MACHINE_INIT + return nil + } + + if machine.Task.TaskType != TASK_NONE { + return errors.New("task is not none") + } + + c.lock.Unlock() + task, ok := <-c.taskQueue + c.lock.Lock() + if !ok { + reply.State = MACHINE_DONE + return nil + } + machine.Task = task + machine.State = MACHINE_READY + + DPrintf("machine %d get task %d, task list is %v %v\n", machine.Id, + machine.Task.TaskId, machine.Task.MapTask, machine.Task.ReduceTask) + *reply = *machine return nil } -func (c *Coordinator) SubmitJob(args *Task, reply *Empty) error { - <-c.lock - defer func() { c.lock <- true }() +func (c *Coordinator) genReduceTask() { + var mapId, reduceId int + entries, err := os.ReadDir(".") + if err != nil { + log.Fatal("read dir error") + } + for _, entry := range entries { + n, _ := fmt.Sscanf(entry.Name(), "mr-%d-%d", &mapId, &reduceId) + if n == 2 && reduceId < c.reduceSize && mapId >= 0 { + // log.Printf("reduce task %d find file %s", reply.ReduceTask.TaskId, entry.Name()) + c.reduceTask[reduceId].Fnames = append(c.reduceTask[reduceId].Fnames, entry.Name()) + } + } +} - switch args.TaskType { +func (c *Coordinator) SubmitTask(args *Machine, reply *Machine) error { + c.lock.Lock() + defer c.lock.Unlock() + if c.Done() { + reply.State = MACHINE_DONE + return nil + } + + machine := c.getMachine(args.Id) + if machine == nil { + reply.State = MACHINE_INIT + return nil + } + task := machine.Task + + switch task.TaskType { case TASK_MAP: - // log.Print("submit map task") - c.mapTask[args.MapTask.TaskId].State = COMPELETED + // log.Printf("submit map task %d\n", task.TaskId) + c.mapTask[task.TaskId].done = true for _, task := range c.mapTask { - if task.State != COMPELETED { - return nil + if !task.done { + goto END } } - log.Print("start reduce task") - for _, task := range c.recudeTask { - c.reduceTaskQueue <- task.TaskId - } + c.genReduceTask() + DPrintf("end map task\n") + go func() { + for i, task := range c.reduceTask { + c.taskQueue <- Task{ + MachineId: -1, + TaskId: i, + TaskType: TASK_REDUCE, + ReduceTask: &task, + MapTask: nil, + } + } + }() + // log.Printf("task queue is %+v %+v\n", c.reduceTask, c.taskQueue) case TASK_REDUCE: // log.Print("submit reduce task") - c.recudeTask[args.ReduceTask.TaskId].State = COMPELETED - for _, task := range c.recudeTask { - if task.State != COMPELETED { - return nil + c.reduceTask[task.TaskId].done = true + for _, task := range c.reduceTask { + if !task.done { + goto END } } - log.Print("end") - // close(c.reduceTaskQueue) - // close(c.mapTaskQueue) + DPrintf("end reduce task\n") + for _, m := range c.machine { + delete(c.machine, m.Id) + } + close(c.taskQueue) + <-c.running c.running <- false + case TASK_NONE: + log.Print("submit tasknone") } + +END: + machine.State = MACHINE_IDLE + machine.Task.TaskType = TASK_NONE + *reply = *machine return nil } +func (c *Coordinator) FlushTimeout(args int, reply *Empty) error { + c.lock.Lock() + defer c.lock.Unlock() + if c.Done() { + return nil + } + + machine := c.getMachine(args) + if machine == nil { + return errors.New("flush machine not found") + } + + machine.LastSecond = machineDefaultLastSecond + return nil +} + +func (c *Coordinator) UpdateState(args *Machine, reply *Machine) error { + c.lock.Lock() + defer c.lock.Unlock() + + if args.State == MACHINE_INIT { + newId := c.machineId + c.machineId += 1 + + machine := MakeMachine(newId, machineDefaultLastSecond) + machine.State = MACHINE_IDLE + + *reply = *machine + c.machine[newId] = machine + // log.Printf("machine %d init map is %v\n", newId, c.machine) + return nil + } + + machine, ok := c.machine[args.Id] + if !ok { + *reply = *args + reply.State = MACHINE_INIT + return nil + } + + machine.LastSecond = machineDefaultLastSecond + *reply = *machine + return nil +} + +func (c *Coordinator) deamon() { + for { + time.Sleep(time.Second) + + c.lock.Lock() + for _, m := range c.machine { + if m.LastSecond > 0 { + m.LastSecond -= 1 + } + if m.LastSecond == 0 { + new_m := Machine{} + new_m = *m + go c.popTask(&new_m) + DPrintf("delete machine %d\n", m.Id) + delete(c.machine, m.Id) + } + } + c.lock.Unlock() + } +} + // an example RPC handler. // // the RPC argument and reply types are defined in rpc.go. @@ -130,40 +274,51 @@ func (c *Coordinator) Done() bool { // nReduce is the number of reduce tasks to use. func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{ - mapTask: make([]MapTask, len(files)), - recudeTask: make([]ReduceTask, nReduce), - mapTaskQueue: make(chan int, len(files)), - reduceTaskQueue: make(chan int, nReduce), - running: make(chan bool, 1), - lock: make(chan bool, 1), + mapTask: make([]MapTask, len(files)), + mapSize: len(files), + + reduceTask: make([]ReduceTask, nReduce), + reduceSize: nReduce, + + taskQueue: make(chan Task), + + machine: make(map[int]*Machine), + machineId: 1, + lock: sync.Mutex{}, + running: make(chan bool, 1), } // Your code here. c.running <- true - c.lock <- true for i, file := range files { c.mapTask[i] = MapTask{ - Fname: file, - State: IDLE, - NReduce: nReduce, - TaskId: i, - MachineId: -1, + Fname: file, + NReduce: nReduce, + done: false, } } for i := range nReduce { - c.recudeTask[i] = ReduceTask{ - Fnames: make([]string, 0), - State: IDLE, - TaskId: i, - MachineId: -1, + c.reduceTask[i] = ReduceTask{ + Fnames: make([]string, 0), + done: false, } } - for _, task := range c.mapTask { - c.mapTaskQueue <- task.TaskId - } + go func() { + for i, task := range c.mapTask { + c.taskQueue <- Task{ + MachineId: -1, + TaskType: TASK_MAP, + MapTask: &task, + ReduceTask: nil, + TaskId: i, + } + } + }() + + go c.deamon() c.server() return &c diff --git a/src/mr/rpc.go b/src/mr/rpc.go index 08ad386..0dc9b72 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -7,6 +7,7 @@ package mr // import ( + "fmt" "os" "strconv" ) @@ -26,42 +27,71 @@ type ExampleReply struct { // Add your RPC definitions here. -type State int +func DPrintf(format string, a ...any) (int, error) { + const Debug = false + var n int + var err error + if Debug { + n, err = fmt.Printf(format, a...) + } + return n, err +} + +type MachineState int const ( - IDLE State = iota - PROGRESS - COMPELETED + MACHINE_INIT MachineState = iota + MACHINE_IDLE + MACHINE_READY + MACHINE_RUNNING + MACHINE_DONE ) +type Machine struct { + State MachineState + Id int + LastSecond int + Task Task +} + +func MakeMachine(id int, lastSecond int) *Machine { + return &Machine{ + State: MACHINE_INIT, + Id: id, + LastSecond: lastSecond, + Task: Task{ + TaskType: TASK_NONE, + MapTask: nil, + ReduceTask: nil, + }, + } +} + type TaskType int const ( - TASK_IDLE TaskType = iota + TASK_NONE TaskType = iota TASK_MAP TASK_REDUCE - TASK_DONE ) type Task struct { + TaskId int + MachineId int TaskType TaskType MapTask *MapTask ReduceTask *ReduceTask } type MapTask struct { - Fname string - State State - NReduce int - TaskId int - MachineId int + Fname string + NReduce int + done bool } type ReduceTask struct { - Fnames []string - State State - MachineId int - TaskId int + Fnames []string + done bool } type Empty struct { diff --git a/src/mr/worker.go b/src/mr/worker.go index bbea2fd..a6bacf5 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -9,6 +9,8 @@ import ( "net/rpc" "os" "sort" + "sync/atomic" + "time" ) // Map functions return a slice of KeyValue. @@ -34,32 +36,84 @@ func Worker(mapf func(string, string) []KeyValue, // uncomment to send the Example RPC to the coordinator. // CallExample() - empty := Empty{} - for { - reply := Task{} - ok := call("Coordinator.GetJob", &empty, &reply) - if !ok { - break - } + machine := MakeMachine(-1, -1) + var id atomic.Int32 - switch reply.TaskType { - case TASK_MAP: - kva := workMap(mapf, reply.MapTask) - saveMap(kva, reply.MapTask) - call("Coordinator.SubmitJob", &reply, &empty) - case TASK_REDUCE: - workRecude(reducef, reply.ReduceTask) - call("Coordinator.SubmitJob", &reply, &empty) - case TASK_IDLE: - case TASK_DONE: - goto END + for { + reply := Machine{} + state := machine.State + switch state { + case MACHINE_INIT: + // log.Printf("worker init %d\n", machine.Id) + if ok := call("Coordinator.UpdateState", machine, &reply); !ok { + reply.State = MACHINE_DONE + } else { + id.Store(int32(reply.Id)) + go func() { + for { + time.Sleep(time.Second) + rid := id.Load() + if ok := call("Coordinator.FlushTimeout", rid, &Empty{}); !ok { + log.Printf("FlushTimeout error %v\n", ok) + } + } + }() + } + machine = &reply + case MACHINE_READY: + if ok := runOnceTask(machine, mapf, reducef); !ok { + machine.State = MACHINE_IDLE + } + case MACHINE_IDLE: + // log.Printf("worker idle %d\n", machine.Id) + machine.Task.TaskType = TASK_NONE + if ok := call("Coordinator.GetTask", machine, &reply); !ok { + reply.State = MACHINE_DONE + } + machine = &reply + case MACHINE_DONE: + return + default: + goto SLEEP } + continue + SLEEP: + time.Sleep(time.Second) } -END: } -func workMap(mapf func(string, string) []KeyValue, params *MapTask) []KeyValue { - filename := params.Fname +func runOnceTask(machine *Machine, + mapf func(string, string) []KeyValue, + reducef func(string, []string) string) bool { + machine.State = MACHINE_RUNNING + task := &machine.Task + + switch task.TaskType { + case TASK_MAP: + kva := workMap(mapf, task) + saved_fname := saveMap(kva, task) + if saved_fname == nil { + return false + } + case TASK_REDUCE: + workRecude(reducef, task) + default: + log.Printf("unknown task type %d", task.TaskType) + return false + } + + // log.Printf("submit task %d %+v %+v", task.TaskId, task.MapTask, task.ReduceTask) + reply := Machine{} + if ok := call("Coordinator.SubmitTask", machine, &reply); !ok { + return false + } + *machine = reply + + return true +} + +func workMap(mapf func(string, string) []KeyValue, params *Task) []KeyValue { + filename := params.MapTask.Fname file, err := os.Open(filename) if err != nil { @@ -74,10 +128,11 @@ func workMap(mapf func(string, string) []KeyValue, params *MapTask) []KeyValue { return kva } -func saveMap(kva []KeyValue, params *MapTask) []string { - fnames := make([]string, params.NReduce) - files := make([]*os.File, params.NReduce) - encodes := make([]*json.Encoder, params.NReduce) +func saveMap(kva []KeyValue, params *Task) []string { + nReduce := params.MapTask.NReduce + fnames := make([]string, nReduce) + files := make([]*os.File, nReduce) + encodes := make([]*json.Encoder, nReduce) for i := range fnames { fnames[i] = fmt.Sprintf("mr-%d-%d", params.TaskId, i) @@ -99,7 +154,7 @@ func saveMap(kva []KeyValue, params *MapTask) []string { }() for _, kv := range kva { - enc := encodes[ihash(kv.Key)%params.NReduce] + enc := encodes[ihash(kv.Key)%nReduce] if err := enc.Encode(&kv); err != nil { log.Fatalf("cannot encode %v", kv) break @@ -117,11 +172,12 @@ func (a ByKey) Len() int { return len(a) } func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key } -func workRecude(reducef func(string, []string) string, params *ReduceTask) { +func workRecude(reducef func(string, []string) string, params *Task) { reduceId := params.TaskId + fnames := params.ReduceTask.Fnames intermediate := []KeyValue{} - for _, fname := range params.Fnames { + for _, fname := range fnames { file, err := os.Open(fname) if err != nil { log.Fatal("can't open file")