MIT65840/src/mr/coordinator.go
2025-04-14 19:13:46 +08:00

326 lines
6.6 KiB
Go

package mr
import (
"errors"
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
"sync"
"time"
)
const machineDefaultLastSecond = 3
type Coordinator struct {
// Your definitions here.
mapTask []MapTask
mapSize int
reduceTask []ReduceTask
reduceSize int
taskQueue chan Task
lock sync.Mutex
machine map[int]*Machine
machineId int
running chan bool
}
func (c *Coordinator) popTask(machine *Machine) {
if machine.Task.TaskType == TASK_NONE {
return
}
DPrintf("machine %d pop task %d, task list is %v %v\n", machine.Id,
machine.Task.TaskId, machine.Task.MapTask, machine.Task.ReduceTask)
c.taskQueue <- machine.Task
machine.Task.TaskType = TASK_NONE
machine.State = MACHINE_IDLE
}
func (c *Coordinator) getMachine(id int) *Machine {
machine, ok := c.machine[id]
if !ok {
DPrintf("machine not found at %d\n", id)
return nil
}
return machine
}
// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(args *Machine, reply *Machine) error {
c.lock.Lock()
defer c.lock.Unlock()
if c.Done() {
reply.State = MACHINE_DONE
return nil
}
machine := c.getMachine(args.Id)
if machine == nil {
reply.State = MACHINE_INIT
return nil
}
if machine.Task.TaskType != TASK_NONE {
return errors.New("task is not none")
}
c.lock.Unlock()
task, ok := <-c.taskQueue
c.lock.Lock()
if !ok {
reply.State = MACHINE_DONE
return nil
}
machine.Task = task
machine.State = MACHINE_READY
DPrintf("machine %d get task %d, task list is %v %v\n", machine.Id,
machine.Task.TaskId, machine.Task.MapTask, machine.Task.ReduceTask)
*reply = *machine
return nil
}
func (c *Coordinator) genReduceTask() {
var mapId, reduceId int
entries, err := os.ReadDir(".")
if err != nil {
log.Fatal("read dir error")
}
for _, entry := range entries {
n, _ := fmt.Sscanf(entry.Name(), "mr-%d-%d", &mapId, &reduceId)
if n == 2 && reduceId < c.reduceSize && mapId >= 0 {
// log.Printf("reduce task %d find file %s", reply.ReduceTask.TaskId, entry.Name())
c.reduceTask[reduceId].Fnames = append(c.reduceTask[reduceId].Fnames, entry.Name())
}
}
}
func (c *Coordinator) SubmitTask(args *Machine, reply *Machine) error {
c.lock.Lock()
defer c.lock.Unlock()
if c.Done() {
reply.State = MACHINE_DONE
return nil
}
machine := c.getMachine(args.Id)
if machine == nil {
reply.State = MACHINE_INIT
return nil
}
task := machine.Task
switch task.TaskType {
case TASK_MAP:
// log.Printf("submit map task %d\n", task.TaskId)
c.mapTask[task.TaskId].done = true
for _, task := range c.mapTask {
if !task.done {
goto END
}
}
c.genReduceTask()
DPrintf("end map task\n")
go func() {
for i, task := range c.reduceTask {
c.taskQueue <- Task{
MachineId: -1,
TaskId: i,
TaskType: TASK_REDUCE,
ReduceTask: &task,
MapTask: nil,
}
}
}()
// log.Printf("task queue is %+v %+v\n", c.reduceTask, c.taskQueue)
case TASK_REDUCE:
// log.Print("submit reduce task")
c.reduceTask[task.TaskId].done = true
for _, task := range c.reduceTask {
if !task.done {
goto END
}
}
DPrintf("end reduce task\n")
for _, m := range c.machine {
delete(c.machine, m.Id)
}
close(c.taskQueue)
<-c.running
c.running <- false
case TASK_NONE:
log.Print("submit tasknone")
}
END:
machine.State = MACHINE_IDLE
machine.Task.TaskType = TASK_NONE
*reply = *machine
return nil
}
func (c *Coordinator) FlushTimeout(args int, reply *Empty) error {
c.lock.Lock()
defer c.lock.Unlock()
if c.Done() {
return nil
}
machine := c.getMachine(args)
if machine == nil {
return errors.New("flush machine not found")
}
machine.LastSecond = machineDefaultLastSecond
return nil
}
func (c *Coordinator) UpdateState(args *Machine, reply *Machine) error {
c.lock.Lock()
defer c.lock.Unlock()
if args.State == MACHINE_INIT {
newId := c.machineId
c.machineId += 1
machine := MakeMachine(newId, machineDefaultLastSecond)
machine.State = MACHINE_IDLE
*reply = *machine
c.machine[newId] = machine
// log.Printf("machine %d init map is %v\n", newId, c.machine)
return nil
}
machine, ok := c.machine[args.Id]
if !ok {
*reply = *args
reply.State = MACHINE_INIT
return nil
}
machine.LastSecond = machineDefaultLastSecond
*reply = *machine
return nil
}
func (c *Coordinator) deamon() {
for {
time.Sleep(time.Second)
c.lock.Lock()
for _, m := range c.machine {
if m.LastSecond > 0 {
m.LastSecond -= 1
}
if m.LastSecond == 0 {
new_m := Machine{}
new_m = *m
go c.popTask(&new_m)
DPrintf("delete machine %d\n", m.Id)
delete(c.machine, m.Id)
}
}
c.lock.Unlock()
}
}
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
ret := false
// Your code here.
state := <-c.running
ret = !state
c.running <- state
return ret
}
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
mapTask: make([]MapTask, len(files)),
mapSize: len(files),
reduceTask: make([]ReduceTask, nReduce),
reduceSize: nReduce,
taskQueue: make(chan Task),
machine: make(map[int]*Machine),
machineId: 1,
lock: sync.Mutex{},
running: make(chan bool, 1),
}
// Your code here.
c.running <- true
for i, file := range files {
c.mapTask[i] = MapTask{
Fname: file,
NReduce: nReduce,
done: false,
}
}
for i := range nReduce {
c.reduceTask[i] = ReduceTask{
Fnames: make([]string, 0),
done: false,
}
}
go func() {
for i, task := range c.mapTask {
c.taskQueue <- Task{
MachineId: -1,
TaskType: TASK_MAP,
MapTask: &task,
ReduceTask: nil,
TaskId: i,
}
}
}()
go c.deamon()
c.server()
return &c
}