goncdu/node/scan.go
2025-02-10 16:26:13 +08:00

194 lines
3.1 KiB
Go

package node
import (
"os"
"runtime"
"sync"
)
func (pn *PathNode) ScanPath() error {
stat, err := os.Lstat(pn.path)
var nodeType PathNodeType
if err != nil {
// return fmt.Errorf("error os.Lstat: [%w]", err)
goto ERR_UNKNOWN
}
if stat.IsDir() {
entries, err := os.ReadDir(pn.path)
if err != nil {
nodeType = UNKNOWN
} else {
for _, entry := range entries {
childNode := NewPathNode(pn.path + "/" + entry.Name())
pn.linkChild(childNode)
}
nodeType = DIR
}
} else if stat.Mode().IsRegular() {
nodeType = FILE
} else {
nodeType = UNKNOWN
}
pn.node_type = nodeType
pn.size = stat.Size()
pn.count = 1
return nil
ERR_UNKNOWN:
pn.count = 1
pn.node_type = UNKNOWN
return nil
}
func (pn *PathNode) IterScanNode() error {
err := pn.ScanPath()
if err != nil {
return err
}
if pn.node_type == DIR {
for _, entry := range pn.children {
err = entry.IterScanNode()
if err != nil {
return err
}
}
pn.count--
pn.flushNode(0, 1)
} else {
pn.flushNode(pn.size, pn.count)
}
return nil
}
func (pn *PathNode) FastIterScanNode() error {
var wg sync.WaitGroup
var nodeMu sync.Mutex
errChan := make(chan error)
// queue := make(chan *PathNode)
maxWorker := runtime.NumCPU()
curWorker := 1
var wkMu sync.Mutex
var worker func(node *PathNode, isWorker bool)
worker = func(node *PathNode, isWorker bool) {
err := node.ScanPath()
if err != nil {
errChan <- err
return
}
if node.node_type == DIR {
for _, entry := range node.children {
wkMu.Lock()
if curWorker < maxWorker {
curWorker++
wkMu.Unlock()
wg.Add(1)
go worker(entry, true)
} else {
wkMu.Unlock()
worker(entry, false)
}
}
}
if node.parent != nil {
nodeMu.Lock()
if node.node_type == DIR {
node.parent.flushNode(0, 1)
} else {
node.parent.flushNode(node.size, node.count)
}
nodeMu.Unlock()
}
if isWorker {
wkMu.Lock()
curWorker--
wkMu.Unlock()
wg.Done()
}
}
wg.Add(1)
go worker(pn, true)
go func() {
wg.Wait()
close(errChan)
}()
for err := range errChan {
return err
}
return nil
}
func (pn *PathNode) FastChanIterScanNode(maxWorker int) error {
var wg sync.WaitGroup
errChan := make(chan error)
if maxWorker <= 0 {
maxWorker = runtime.NumCPU()
}
nodeCh := make(chan bool, 1)
defer close(nodeCh)
wkCh := make(chan bool, maxWorker)
defer close(wkCh)
var worker func(node *PathNode, isWorker bool)
worker = func(node *PathNode, isWorker bool) {
err := node.ScanPath()
if err != nil {
errChan <- err
return
}
if node.node_type == DIR {
for _, entry := range node.children {
select {
case wkCh <- true:
wg.Add(1)
go worker(entry, true)
default:
worker(entry, false)
}
}
}
if node.parent != nil {
nodeCh <- true
if node.node_type == DIR {
node.parent.flushNode(0, 1)
} else {
node.parent.flushNode(node.size, node.count)
}
<-nodeCh
}
if isWorker {
<-wkCh
wg.Done()
}
}
wg.Add(1)
wkCh <- true
go worker(pn, true)
go func() {
wg.Wait()
close(errChan)
}()
for err := range errChan {
return err
}
return nil
}