- 将 `resolved_deps` 重命名为 `deps` 并新增 `dep_map` 用于存储依赖关系 - 新增 `get_dependencies_of` 方法以获取指定包的直接依赖列表 - 实现递归打印依赖树的功能,优化 `tree` 命令展示效果 - 引入分层命令行参数解析,支持子命令及更多选项(如 --record、--args 等) - 改进日志输出与构建模式提示信息,使其更准确反映当前构建状态 - 编译器类中增加命令记录机制,便于调试和回溯执行过程
1049 lines
32 KiB
Python
1049 lines
32 KiB
Python
"""cbuild.py"""
|
||
|
||
from abc import ABC, abstractmethod
|
||
import tomllib
|
||
import pprint
|
||
import subprocess
|
||
from pathlib import Path
|
||
from dataclasses import asdict, dataclass, field
|
||
from enum import Enum, auto
|
||
import logging
|
||
import hashlib
|
||
from graphlib import TopologicalSorter
|
||
import shutil
|
||
import argparse
|
||
from typing import Self
|
||
import time
|
||
import sys
|
||
import json
|
||
|
||
|
||
class ColorFormatter(logging.Formatter):
|
||
"""颜色格式化"""
|
||
|
||
# 基础颜色定义(对应ANSI颜色码)
|
||
cyan = "\x1b[36m" # DEBUG - 青色
|
||
green = "\x1b[32m" # INFO - 绿色
|
||
blue = "\x1b[34m" # TRACE - 蓝色
|
||
yellow = "\x1b[33m" # WARN - 黄色
|
||
red = "\x1b[31m" # ERROR - 红色
|
||
line_red = "\x1b[31;4m" # FATAL - 红色加下划线
|
||
bold = "\x1b[1m" # 粗体
|
||
reset = "\x1b[0m" # 重置
|
||
|
||
fmt = "%(name)s - %(levelname)s: "
|
||
time = "%(asctime)s"
|
||
msg = "%(message)s"
|
||
file = " - (%(filename)s:%(lineno)d)"
|
||
|
||
FORMATS = {
|
||
logging.DEBUG: bold + cyan + fmt + reset + msg + file,
|
||
logging.INFO: bold + green + fmt + reset + msg,
|
||
logging.WARNING: bold + yellow + fmt + reset + msg,
|
||
logging.ERROR: bold + red + fmt + reset + msg,
|
||
logging.CRITICAL: bold + line_red + fmt + reset + msg + file,
|
||
logging.FATAL: bold + line_red + fmt + reset + msg + file,
|
||
}
|
||
|
||
def format(self, record):
|
||
log_fmt = self.FORMATS.get(record.levelno)
|
||
formatter = logging.Formatter(log_fmt)
|
||
return formatter.format(record)
|
||
|
||
|
||
def setup_logger() -> logging.Logger:
|
||
"""设置日志记录器"""
|
||
_logger = logging.getLogger(__name__)
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setFormatter(ColorFormatter())
|
||
_logger.addHandler(console_handler)
|
||
return _logger
|
||
|
||
|
||
logger = setup_logger()
|
||
|
||
|
||
@dataclass
|
||
class Dependency:
|
||
"""依赖配置"""
|
||
|
||
name: str
|
||
path: str
|
||
version: str = "0.0.0"
|
||
optional: bool = False
|
||
|
||
|
||
@dataclass
|
||
class Feature:
|
||
"""特性配置"""
|
||
|
||
name: str
|
||
description: str = ""
|
||
dependencies: list[str] = field(default_factory=list)
|
||
|
||
|
||
class PackageConfig:
|
||
"""包配置类, 用于解析和管理cbuild.toml配置"""
|
||
|
||
CONFIG_FILE = "cbuild.toml"
|
||
|
||
def __init__(self, config_path: Path):
|
||
package_file = config_path / self.CONFIG_FILE
|
||
if not package_file.exists():
|
||
raise ValueError(f"Package path {package_file} does not exist")
|
||
with open(package_file, "rb") as file:
|
||
raw_config = tomllib.load(file)
|
||
self.config: dict = raw_config.get("package", {})
|
||
self.path: Path = config_path
|
||
|
||
def __str__(self) -> str:
|
||
return pprint.pformat(self.config)
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
"""获取包的名称
|
||
|
||
Returns:
|
||
str: 包的名称,如果未定义则返回空字符串
|
||
"""
|
||
return self.config.get("name", "[unnamed package]")
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
"""获取包的版本
|
||
|
||
Returns:
|
||
str: 包的版本号,如果未定义则返回空字符串
|
||
"""
|
||
return self.config.get("version", "0.0.0")
|
||
|
||
@property
|
||
def default_features(self) -> list[str]:
|
||
"""获取默认启用的特性列表
|
||
|
||
Returns:
|
||
list[str]: 默认特性的名称列表
|
||
"""
|
||
return self.config.get("default_features", [])
|
||
|
||
@property
|
||
def features(self) -> list[Feature]:
|
||
"""获取所有可用特性及其依赖
|
||
|
||
Returns:
|
||
list[Feature]: 特性对象列表
|
||
"""
|
||
features_data = self.config.get("features", {})
|
||
features = []
|
||
for feature in features_data:
|
||
if isinstance(feature, str):
|
||
feature = Feature(name=feature)
|
||
elif isinstance(feature, dict):
|
||
name = feature.get("name", None)
|
||
if name is None:
|
||
continue
|
||
feature = Feature(
|
||
name=name,
|
||
description=feature.get("description", ""),
|
||
dependencies=feature.get("dependencies", []),
|
||
)
|
||
features.append(feature)
|
||
return features
|
||
|
||
@property
|
||
def dependencies(self) -> list[Dependency]:
|
||
"""获取包的依赖列表
|
||
|
||
Returns:
|
||
list[Dependency]: 依赖对象列表
|
||
"""
|
||
deps_data = self.config.get("dependencies", [])
|
||
dependencies = []
|
||
for dep_dict in deps_data:
|
||
if isinstance(dep_dict, dict):
|
||
dependency = Dependency(
|
||
name=dep_dict.get("name", ""),
|
||
path=dep_dict.get("path", ""),
|
||
version=dep_dict.get("version", "0.0.0"),
|
||
optional=dep_dict.get("optional", False),
|
||
)
|
||
dependencies.append(dependency)
|
||
return dependencies
|
||
|
||
@property
|
||
def authors(self) -> list[str]:
|
||
"""获取作者列表
|
||
|
||
Returns:
|
||
list[str]: 作者列表
|
||
"""
|
||
return self.config.get("authors", [])
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
"""获取包的描述
|
||
|
||
Returns:
|
||
str: 包的描述文本
|
||
"""
|
||
return self.config.get("description", "")
|
||
|
||
|
||
class DependencyResolver:
|
||
"""依赖解析器"""
|
||
|
||
def __init__(self, root_package: PackageConfig):
|
||
self.root_package = root_package
|
||
self.deps: dict[str, PackageConfig] = {}
|
||
self.deps_graph: TopologicalSorter = TopologicalSorter()
|
||
self.dep_map: dict[str, list[str]] = {}
|
||
self._resolved = False
|
||
|
||
def resolve(self) -> dict[str, PackageConfig]:
|
||
"""解析所有依赖"""
|
||
if self._resolved:
|
||
return self.deps.copy()
|
||
|
||
# 使用广度优先搜索解析所有依赖
|
||
queue = [self.root_package]
|
||
visited = set()
|
||
|
||
while queue:
|
||
pkg_config = queue.pop(0)
|
||
pkg_name = pkg_config.name
|
||
|
||
if pkg_name in visited:
|
||
continue
|
||
visited.add(pkg_name)
|
||
|
||
# 创建构建上下文
|
||
if pkg_name not in self.deps:
|
||
self.deps[pkg_name] = pkg_config
|
||
|
||
if pkg_name not in self.dep_map:
|
||
self.dep_map[pkg_name] = []
|
||
|
||
# 解析直接依赖
|
||
for dep in pkg_config.dependencies:
|
||
dep_path = Path(pkg_config.path / dep.path)
|
||
dep_config = PackageConfig(dep_path)
|
||
dep_name = dep_config.name
|
||
|
||
# 添加依赖图关系
|
||
self.deps_graph.add(pkg_name, dep_name)
|
||
self.dep_map[pkg_name].append(dep_name)
|
||
|
||
# 如果是新依赖,加入队列继续解析
|
||
if dep_name not in visited and dep_name not in self.deps:
|
||
queue.append(dep_config)
|
||
|
||
self.deps_graph.prepare()
|
||
self._resolved = True
|
||
return self.deps.copy()
|
||
|
||
def get_dependencies_of(self, package_name: str) -> list[str]:
|
||
"""获取指定包的直接依赖列表
|
||
|
||
Args:
|
||
package_name (str): 包名称
|
||
|
||
Returns:
|
||
list[str]: 直接依赖的包名列表
|
||
"""
|
||
return self.dep_map.get(package_name, [])
|
||
|
||
def get_sorted_dependencies(self) -> list[CBuildContext]:
|
||
"""获取按拓扑排序的依赖列表(不包括根包)"""
|
||
if not self._resolved:
|
||
self.resolve()
|
||
|
||
sorted_names = list(self.deps_graph.static_order())
|
||
return [
|
||
CBuildContext(self.deps[name])
|
||
for name in sorted_names
|
||
if name != self.root_package.name and name in self.deps
|
||
]
|
||
|
||
def get_all_contexts(self) -> list[CBuildContext]:
|
||
"""获取所有上下文(包括根包)"""
|
||
if not self._resolved:
|
||
self.resolve()
|
||
|
||
return [CBuildContext(resolved_dep) for resolved_dep in self.deps.values()]
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class BuildPath:
|
||
"""path"""
|
||
|
||
root_path: Path
|
||
tests_path: Path
|
||
output_path: Path
|
||
object_path: Path
|
||
src_path: Path
|
||
inc_path: Path
|
||
default_bin_path: Path
|
||
default_lib_path: Path
|
||
|
||
@classmethod
|
||
def from_root_path(cls, root_path: Path) -> Self:
|
||
"""_summary_
|
||
|
||
Args:
|
||
output_path (Path): _description_
|
||
|
||
Returns:
|
||
Self: _description_
|
||
"""
|
||
src_path = root_path / "src"
|
||
output_path = root_path / "build"
|
||
return cls(
|
||
root_path=root_path,
|
||
tests_path=root_path / "tests",
|
||
inc_path=root_path / "include",
|
||
output_path=output_path,
|
||
object_path=output_path / "obj",
|
||
src_path=src_path,
|
||
default_bin_path=src_path / "main.c",
|
||
default_lib_path=src_path / "lib.c",
|
||
)
|
||
|
||
|
||
class TargetType(Enum):
|
||
"""目标文件类型枚举"""
|
||
|
||
MAIN_EXECUTABLE = auto()
|
||
EXECUTABLE = auto()
|
||
TEST_EXECUTABLE = auto()
|
||
STATIC_LIBRARY = auto()
|
||
SHARED_LIBRARY = auto()
|
||
|
||
|
||
@dataclass
|
||
class Target:
|
||
"""目标文件信息"""
|
||
|
||
name: str
|
||
type: TargetType
|
||
source: Path
|
||
object: Path
|
||
output: Path
|
||
|
||
|
||
class CBuildContext:
|
||
"""构建上下文,管理所有包的信息"""
|
||
|
||
def __init__(
|
||
self,
|
||
package: PackageConfig,
|
||
build_path: BuildPath | None = None,
|
||
need_cache: bool = True,
|
||
):
|
||
self.package = package
|
||
self.relative_path = Path.cwd()
|
||
self.path = (
|
||
BuildPath.from_root_path(package.path) if build_path is None else build_path
|
||
)
|
||
self.deps_resolver = DependencyResolver(package)
|
||
|
||
def _path_collection(self, path: list[Path]) -> list[Path]:
|
||
p = [str(p.resolve()) for p in path if p.exists()]
|
||
unique_paths = set(p)
|
||
return [
|
||
Path(p).relative_to(self.relative_path, walk_up=True) for p in unique_paths
|
||
]
|
||
|
||
@property
|
||
def sources_path(self) -> list[Path]:
|
||
"""获取所有包的源文件路径"""
|
||
return list(self.path.src_path.glob("**/*.c"))
|
||
|
||
@property
|
||
def tests_path(self) -> list[Path]:
|
||
"""获取所有测试源文件路径 eg. `tests/test_*.c`"""
|
||
test_sources = []
|
||
test_path = self.path.root_path / "tests"
|
||
if test_path.exists():
|
||
for file in test_path.glob("test_*.c"):
|
||
test_sources.append(file)
|
||
return test_sources
|
||
|
||
@property
|
||
def includes(self) -> list[Path]:
|
||
"""获取包的包含路径"""
|
||
deps = self.deps_resolver.get_all_contexts()
|
||
includes = [inc.path.inc_path for inc in deps if inc.path.inc_path.exists()]
|
||
return self._path_collection(includes)
|
||
|
||
def get_targets(self) -> list[Target]:
|
||
"""获取所有构建目标"""
|
||
targets = []
|
||
execute_ext = ""
|
||
if sys.platform == "win32":
|
||
execute_ext = ".exe"
|
||
|
||
# 添加主可执行文件目标
|
||
if self.path.default_bin_path.exists():
|
||
targets.append(
|
||
Target(
|
||
name=self.package.name,
|
||
type=TargetType.MAIN_EXECUTABLE,
|
||
source=self.path.default_bin_path,
|
||
object=self.get_object_path(self.path.default_bin_path),
|
||
output=self.path.output_path / f"{self.package.name}{execute_ext}",
|
||
)
|
||
)
|
||
|
||
# 添加静态库目标
|
||
if self.path.default_lib_path.exists():
|
||
targets.append(
|
||
Target(
|
||
name=f"lib{self.package.name}",
|
||
type=TargetType.STATIC_LIBRARY,
|
||
source=self.path.default_lib_path,
|
||
object=self.get_object_path(self.path.default_lib_path),
|
||
output=self.path.output_path / f"lib{self.package.name}.a",
|
||
)
|
||
)
|
||
|
||
# 添加测试目标
|
||
if self.path.tests_path.exists():
|
||
for test_source in self.tests_path:
|
||
targets.append(
|
||
Target(
|
||
name=test_source.stem,
|
||
type=TargetType.TEST_EXECUTABLE,
|
||
source=test_source,
|
||
object=self.get_object_path(test_source),
|
||
output=self.path.output_path
|
||
/ f"{test_source.stem}{execute_ext}",
|
||
)
|
||
)
|
||
|
||
return targets
|
||
|
||
def get_self_build_objs(self) -> list[Path]:
|
||
"""获取当前包的源文件路径"""
|
||
objs = []
|
||
for path in self.sources_path:
|
||
if path == self.path.default_bin_path:
|
||
continue
|
||
if path == self.path.default_lib_path:
|
||
continue
|
||
objs.append(path)
|
||
return objs
|
||
|
||
def get_object_path(self, source_path: Path) -> Path:
|
||
"""将源文件路径映射到对象文件路径
|
||
|
||
Args:
|
||
source_path (Path): 源文件路径(.c文件)
|
||
|
||
Returns:
|
||
Path: 对应的对象文件路径(.o文件)
|
||
"""
|
||
# 确保输出目录存在
|
||
objects_dir = self.path.object_path
|
||
objects_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
try:
|
||
# 尝试生成相对于根目录的路径结构,避免冲突
|
||
relative_path = source_path.relative_to(self.path.root_path)
|
||
# 将路径分隔符替换为特殊字符,防止文件系统问题
|
||
safe_relative_path = str(relative_path).replace("/", "_").replace("\\", "_")
|
||
object_path = objects_dir / Path(safe_relative_path).with_suffix(".o")
|
||
except ValueError:
|
||
# 如果源文件完全不在项目目录下,则使用完整路径哈希
|
||
full_path_str = str(source_path.absolute())
|
||
path_hash = hashlib.md5(full_path_str.encode()).hexdigest()[:8]
|
||
object_filename = f"{source_path.stem}_{path_hash}{source_path.suffix}.o"
|
||
object_path = objects_dir / object_filename
|
||
|
||
# 确保对象文件的目录存在
|
||
object_path.parent.mkdir(parents=True, exist_ok=True)
|
||
return object_path
|
||
|
||
def get_build_objs(self) -> list[tuple[Path, Path]]:
|
||
"""获取所有需要编译的源文件及目标文件(排除主文件即main.c, lib.c, bin/*.c)"""
|
||
deps = self.deps_resolver.get_all_contexts()
|
||
objs = []
|
||
for dep in deps:
|
||
objs.extend(dep.get_self_build_objs())
|
||
self._path_collection(objs)
|
||
|
||
return [
|
||
(source_path, self.get_object_path(source_path)) for source_path in objs
|
||
]
|
||
|
||
|
||
@dataclass
|
||
class CacheEntry:
|
||
"""缓存条目"""
|
||
|
||
mtime: float
|
||
hash: str
|
||
obj_path: str
|
||
|
||
|
||
class BuildCache:
|
||
"""构建缓存管理器"""
|
||
|
||
def __init__(self, build_path: Path):
|
||
self.cache_dir = build_path / Path("cache")
|
||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||
self.cache_file = self.cache_dir / Path("cache.json")
|
||
self.cache: dict[str, CacheEntry] = {}
|
||
|
||
def load(self):
|
||
"""加载缓存数据"""
|
||
if not self.cache_file.exists() or self.cache_file.stat().st_size == 0:
|
||
self.cache_file.touch()
|
||
self.cache = {}
|
||
return
|
||
with self.cache_file.open("r", encoding="utf-8") as f:
|
||
data: dict = json.load(f)
|
||
self.cache = {key: CacheEntry(**entry_data) for key, entry_data in data.items()}
|
||
|
||
def clear(self):
|
||
"""清空缓存数据"""
|
||
self.cache.clear()
|
||
|
||
def save(self):
|
||
"""保存缓存数据"""
|
||
serializable_cache = {key: asdict(entry) for key, entry in self.cache.items()}
|
||
with self.cache_file.open("w", encoding="utf-8") as f:
|
||
json.dump(serializable_cache, f)
|
||
|
||
def _calculate_file_hash(self, file_path: Path) -> str:
|
||
"""计算文件内容哈希"""
|
||
hash_md5 = hashlib.md5()
|
||
with open(file_path, "rb") as f:
|
||
hashlib.file_digest(f, "md5")
|
||
return hash_md5.hexdigest()
|
||
|
||
def needs_rebuild(self, source_path: Path, object_path: Path) -> bool:
|
||
"""检查是否需要重新编译
|
||
TODO 头文件并不会被计算
|
||
"""
|
||
source_key = str(source_path)
|
||
|
||
# 如果没有缓存记录,需要重新编译
|
||
if source_key not in self.cache:
|
||
return True
|
||
|
||
cache_entry = self.cache[source_key]
|
||
|
||
# 检查源文件是否更新
|
||
if source_path.stat().st_mtime > cache_entry.mtime:
|
||
return True
|
||
|
||
# 检查对象文件是否存在
|
||
if not object_path.exists():
|
||
return True
|
||
|
||
if cache_entry.hash != self._calculate_file_hash(source_path):
|
||
return True
|
||
|
||
return False
|
||
|
||
def update_cache(self, source_path: Path, object_path: Path):
|
||
"""更新缓存记录"""
|
||
source_key = str(source_path)
|
||
|
||
self.cache[source_key] = CacheEntry(
|
||
mtime=source_path.stat().st_mtime,
|
||
hash=self._calculate_file_hash(source_path),
|
||
obj_path=str(object_path.absolute()),
|
||
)
|
||
|
||
|
||
class CompilerBuildMode(Enum):
|
||
"""编译模式"""
|
||
|
||
TEST = "test"
|
||
DEV = "dev"
|
||
DEBUG = "debug"
|
||
RELEASE = "release"
|
||
NONE = "none"
|
||
|
||
|
||
class Compiler(ABC):
|
||
"""编译器抽象类"""
|
||
|
||
def __init__(self):
|
||
self.recorded_commands: list[str] = []
|
||
self.should_record = False
|
||
|
||
def enable_recording(self, enable=True):
|
||
"""启用或禁用命令记录"""
|
||
self.should_record = enable
|
||
if enable:
|
||
self.recorded_commands.clear()
|
||
|
||
def get_recorded_commands(self):
|
||
"""获取记录的命令列表"""
|
||
return self.recorded_commands.copy()
|
||
|
||
def get_default_flags(self, mode: CompilerBuildMode) -> list[str]:
|
||
"""获取指定模式的默认标志"""
|
||
logger.debug("get default flags for mode: %s is not supported", mode.name)
|
||
return []
|
||
|
||
@abstractmethod
|
||
def compile(
|
||
self, sources: Path, output: Path, includes: list[Path], flags: list[str]
|
||
):
|
||
"""编译源文件"""
|
||
|
||
@abstractmethod
|
||
def link(
|
||
self, objects: list[Path], libraries: list[str], flags: list[str], output: Path
|
||
):
|
||
"""链接对象文件"""
|
||
|
||
def cmd(self, cmd: str | list[str]):
|
||
"""执行命令并处理错误输出"""
|
||
if self.should_record:
|
||
self.recorded_commands.append(
|
||
" ".join(cmd) if isinstance(cmd, list) else cmd
|
||
)
|
||
try:
|
||
logger.debug(
|
||
"command: `%s`", " ".join(cmd) if isinstance(cmd, list) else cmd
|
||
)
|
||
subprocess.run(cmd, check=True)
|
||
except subprocess.CalledProcessError as e:
|
||
# 输出详细错误信息帮助调试
|
||
cmd_str = " ".join(e.cmd) if isinstance(e.cmd, list) else e.cmd
|
||
logger.fatal("command running error: [%d]`%s`", e.returncode, cmd_str)
|
||
raise
|
||
|
||
|
||
class ClangCompiler(Compiler):
|
||
"""Clang编译器"""
|
||
|
||
def compile(
|
||
self, sources: Path, output: Path, includes: list[Path], flags: list[str]
|
||
):
|
||
"""编译源文件"""
|
||
cmd = ["clang"]
|
||
cmd.extend(flags)
|
||
cmd.extend(["-c", str(sources), "-o", str(output)])
|
||
cmd.extend(f"-I{inc}" for inc in includes)
|
||
self.cmd(cmd)
|
||
|
||
def link(
|
||
self, objects: list[Path], libraries: list[str], flags: list[str], output: Path
|
||
):
|
||
"""链接对象文件"""
|
||
cmd = ["clang"]
|
||
cmd.extend(flags)
|
||
cmd.extend(["-o", str(output)])
|
||
cmd.extend(str(obj) for obj in objects)
|
||
cmd.extend(lib for lib in libraries)
|
||
self.cmd(cmd)
|
||
|
||
|
||
class GccCompiler(Compiler):
|
||
"""Gcc编译器"""
|
||
|
||
def __init__(self) -> None:
|
||
super().__init__()
|
||
self.default_flags = {
|
||
CompilerBuildMode.NONE: [],
|
||
CompilerBuildMode.TEST: [
|
||
"-DTEST_MODE",
|
||
"-O2",
|
||
"-g",
|
||
"--coverage",
|
||
"-Wall",
|
||
"-Wextra",
|
||
],
|
||
CompilerBuildMode.DEV: ["-DDEV_MODE", "-O0", "-g", "-Wall", "-Wextra"],
|
||
CompilerBuildMode.DEBUG: [
|
||
"-DDEBUG_MODE",
|
||
"-O0",
|
||
"-g",
|
||
"-fsanitize=address",
|
||
"-fsanitize=undefined",
|
||
"-fno-omit-frame-pointer",
|
||
"-Wall",
|
||
"-Wextra",
|
||
"-Werror",
|
||
],
|
||
CompilerBuildMode.RELEASE: ["-O2", "-flto"],
|
||
}
|
||
|
||
def get_default_flags(self, mode: CompilerBuildMode) -> list[str]:
|
||
"""获取指定模式的默认标志"""
|
||
return self.default_flags[mode]
|
||
|
||
def compile(
|
||
self, sources: Path, output: Path, includes: list[Path], flags: list[str]
|
||
):
|
||
"""编译源文件"""
|
||
cmd = ["gcc"]
|
||
cmd.extend(flags)
|
||
cmd.extend(["-c", str(sources), "-o", str(output)])
|
||
cmd.extend(f"-I{inc}" for inc in includes)
|
||
self.cmd(cmd)
|
||
|
||
def link(
|
||
self, objects: list[Path], libraries: list[str], flags: list[str], output: Path
|
||
):
|
||
"""链接对象文件"""
|
||
cmd = ["gcc"]
|
||
cmd.extend(flags)
|
||
cmd.extend(["-o", str(output)])
|
||
cmd.extend(str(obj) for obj in objects)
|
||
cmd.extend(lib for lib in libraries)
|
||
self.cmd(cmd)
|
||
|
||
|
||
class CPackageBuilder:
|
||
"""包构建器"""
|
||
|
||
def __init__(
|
||
self,
|
||
package_path: Path,
|
||
compiler: Compiler,
|
||
mode: CompilerBuildMode = CompilerBuildMode.DEV,
|
||
need_cache: bool = True,
|
||
):
|
||
self.package = PackageConfig(package_path)
|
||
self.context = CBuildContext(self.package, None)
|
||
self.compiler: Compiler = compiler
|
||
# FIXME hack context
|
||
self.cache = BuildCache(self.context.path.output_path) if need_cache else None
|
||
self.mode = mode
|
||
self.global_flags = self.compiler.get_default_flags(mode)
|
||
|
||
def _compile(
|
||
self, sources: Path, output: Path, includes: list[Path], flags: list[str]
|
||
) -> None:
|
||
"""缓存编译"""
|
||
if self.cache is not None and self.cache.needs_rebuild(sources, output):
|
||
self.compiler.compile(sources, output, includes, flags)
|
||
self.cache.update_cache(sources, output)
|
||
else:
|
||
logger.debug("Skipping %s", sources)
|
||
|
||
def _build_ctx(self) -> list[Path]:
|
||
"""构建上下文"""
|
||
# 确保输出目录存在
|
||
self.context.path.output_path.mkdir(parents=True, exist_ok=True)
|
||
self.context.path.object_path.mkdir(parents=True, exist_ok=True)
|
||
path_map = self.context.get_build_objs()
|
||
|
||
for src, obj in path_map:
|
||
self._compile(src, obj, self.context.includes, self.global_flags)
|
||
return [pair[1] for pair in path_map]
|
||
|
||
def _format_size(self, size_bytes):
|
||
"""格式化文件大小显示"""
|
||
if size_bytes == 0:
|
||
return "0B"
|
||
|
||
size_names = ["B", "KB", "MB", "GB"]
|
||
i = 0
|
||
while size_bytes >= 1024 and i < len(size_names) - 1:
|
||
size_bytes /= 1024.0
|
||
i += 1
|
||
|
||
if i == 0:
|
||
return f"{int(size_bytes)}{size_names[i]}"
|
||
return f"{size_bytes:.1f}{size_names[i]}"
|
||
|
||
def build(self, targets_type: list[TargetType]):
|
||
"""构建包"""
|
||
start_time = time.time()
|
||
if self.cache is not None:
|
||
self.cache.load()
|
||
|
||
object_files = self._build_ctx()
|
||
targets = self.context.get_targets()
|
||
for target in targets:
|
||
if target.type not in targets_type:
|
||
continue
|
||
|
||
match target.type:
|
||
case TargetType.MAIN_EXECUTABLE:
|
||
self._compile(
|
||
target.source,
|
||
target.object,
|
||
self.context.includes,
|
||
self.global_flags,
|
||
)
|
||
object_files.append(target.object)
|
||
self.compiler.link(
|
||
object_files, [], self.global_flags, target.output
|
||
)
|
||
object_files.remove(target.object)
|
||
case TargetType.TEST_EXECUTABLE:
|
||
self._compile(
|
||
target.source,
|
||
target.object,
|
||
self.context.includes,
|
||
self.global_flags,
|
||
)
|
||
object_files.append(target.object)
|
||
self.compiler.link(
|
||
object_files, [], self.global_flags, target.output
|
||
)
|
||
object_files.remove(target.object)
|
||
|
||
if self.cache is not None:
|
||
self.cache.save()
|
||
# 计算构建时间
|
||
elapsed_time = time.time() - start_time
|
||
time_str = (
|
||
f"{elapsed_time:.2f}s" if elapsed_time < 60 else f"{elapsed_time / 60:.1f}m"
|
||
)
|
||
# 显示完成信息
|
||
logger.info("Finished %s target(s) in %s", self.mode.value, time_str)
|
||
|
||
def run(self):
|
||
"""运行项目"""
|
||
targets = [
|
||
target
|
||
for target in self.context.get_targets()
|
||
if target.type == TargetType.MAIN_EXECUTABLE
|
||
]
|
||
if len(targets) != 1:
|
||
logger.error("not have target to run")
|
||
subprocess.run(targets[0].output, check=False)
|
||
|
||
def clean(self):
|
||
"""清理构建产物"""
|
||
# 统计要删除的文件数量和总大小
|
||
cleaned_files = 0
|
||
total_size = 0
|
||
# 删除整个输出目录,这会清除所有构建产物
|
||
if self.context.path.output_path.exists():
|
||
# 遍历目录统计文件数量和大小
|
||
for file_path in self.context.path.output_path.rglob("*"):
|
||
if file_path.is_file():
|
||
cleaned_files += 1
|
||
total_size += file_path.stat().st_size
|
||
shutil.rmtree(self.context.path.output_path)
|
||
logger.info("已清理构建目录: %s", self.context.path.output_path)
|
||
logger.info(
|
||
"Cleaned %s files, Size %s",
|
||
cleaned_files,
|
||
self._format_size(total_size),
|
||
)
|
||
else:
|
||
logger.info("没有找到构建目录,无需清理")
|
||
|
||
def tree(self):
|
||
"""打印构建树"""
|
||
print("dependency tree:")
|
||
|
||
# 解析依赖
|
||
deps = self.context.deps_resolver.resolve() # 确保依赖已解析
|
||
root_pkg = self.context.deps_resolver.root_package
|
||
|
||
def print_tree(pkg_name, prefix="", is_last=True):
|
||
"""递归打印依赖树"""
|
||
|
||
# 获取包配置
|
||
pkg_config = deps[pkg_name]
|
||
|
||
# 打印当前包
|
||
if pkg_name == root_pkg.name:
|
||
print(f"{pkg_config.name} v{pkg_config.version}")
|
||
else:
|
||
connector = "└── " if is_last else "├── "
|
||
print(f"{prefix}{connector}{pkg_config.name} v{pkg_config.version}")
|
||
|
||
# 计算子节点的前缀
|
||
if pkg_name == root_pkg.name:
|
||
child_prefix = ""
|
||
else:
|
||
child_prefix = prefix + (" " if is_last else "│ ")
|
||
|
||
# 递归打印依赖
|
||
dependencies = self.context.deps_resolver.get_dependencies_of(pkg_name)
|
||
for i, dep_name in enumerate(dependencies):
|
||
is_last_child = i == len(dependencies) - 1
|
||
print_tree(dep_name, child_prefix, is_last_child)
|
||
|
||
# 从根包开始打印树
|
||
print_tree(root_pkg.name)
|
||
|
||
def tests(self):
|
||
"""运行测试"""
|
||
targets = [
|
||
target
|
||
for target in self.context.get_targets()
|
||
if target.type == TargetType.TEST_EXECUTABLE
|
||
]
|
||
passed = 0
|
||
failed = 0
|
||
for target in targets:
|
||
name = target.name
|
||
logger.info("运行测试: %s", name)
|
||
logger.debug("test run %s", target.output)
|
||
try:
|
||
result = subprocess.run(
|
||
target.output,
|
||
check=True,
|
||
# capture_output=True,
|
||
# text=True,
|
||
timeout=30,
|
||
)
|
||
if result.returncode == 0:
|
||
print(f" ✓ 测试 {name} 通过")
|
||
passed += 1
|
||
else:
|
||
print(f" ✗ 测试 {name} 失败")
|
||
if result.stdout:
|
||
print(f" 输出: {result.stdout}")
|
||
if result.stderr:
|
||
print(f" 错误: {result.stderr}")
|
||
failed += 1
|
||
except subprocess.TimeoutExpired:
|
||
print(f" ✗ 测试 {name} 超时")
|
||
failed += 1
|
||
except subprocess.SubprocessError as e:
|
||
print(f" ✗ 测试 {name} 运行异常: {e}")
|
||
failed += 1
|
||
print(f"\n测试结果: {passed} 通过, {failed} 失败")
|
||
|
||
|
||
def setup_argparse():
|
||
"""设置分级命令行参数解析器"""
|
||
parser = argparse.ArgumentParser(
|
||
description="Simple C Package Manager", prog="cbuild"
|
||
)
|
||
|
||
# 全局选项
|
||
parser.add_argument(
|
||
"--compiler",
|
||
"-c",
|
||
choices=["gcc", "clang", "smcc"],
|
||
default="gcc",
|
||
help="Compiler to use (default: gcc)",
|
||
)
|
||
parser.add_argument(
|
||
"--verbose",
|
||
"-V",
|
||
action="store_true",
|
||
help="Enable verbose logging (default: false)",
|
||
)
|
||
parser.add_argument(
|
||
"--path",
|
||
"-p",
|
||
default=".",
|
||
help="Path to the package (default: current directory)",
|
||
)
|
||
parser.add_argument(
|
||
"--record",
|
||
"-r",
|
||
action="store_true",
|
||
help="Record the compiler command and output to stdout",
|
||
)
|
||
|
||
# 创建子命令解析器
|
||
subparsers = parser.add_subparsers(
|
||
dest="command", help="Available commands", metavar="COMMAND"
|
||
)
|
||
|
||
# Build 子命令
|
||
subparsers.add_parser("build", help="Build the project")
|
||
|
||
# Run 子命令
|
||
run_parser = subparsers.add_parser("run", help="Build and run the project")
|
||
run_parser.add_argument(
|
||
"--args", nargs=argparse.REMAINDER, help="Arguments to pass to the program"
|
||
)
|
||
|
||
# Test 子命令
|
||
test_parser = subparsers.add_parser("test", help="Build and run tests")
|
||
test_parser.add_argument("--filter", help="Filter tests by name pattern")
|
||
|
||
# Clean 子命令
|
||
clean_parser = subparsers.add_parser("clean", help="Clean build artifacts")
|
||
clean_parser.add_argument(
|
||
"--all", action="store_true", help="Clean all including cache"
|
||
)
|
||
|
||
# Tree 子命令
|
||
subparsers.add_parser("tree", help="Show dependency tree")
|
||
|
||
return parser
|
||
|
||
|
||
def main():
|
||
"""main function with hierarchical command processing"""
|
||
parser = setup_argparse()
|
||
args = parser.parse_args()
|
||
|
||
# 设置日志级别
|
||
if args.verbose:
|
||
logger.setLevel(logging.DEBUG)
|
||
else:
|
||
logger.setLevel(logging.INFO)
|
||
|
||
# 初始化编译器
|
||
match args.compiler:
|
||
case "gcc":
|
||
compiler = GccCompiler()
|
||
case "clang":
|
||
compiler = ClangCompiler()
|
||
case "smcc":
|
||
# TODO self compiler
|
||
raise ValueError("Invalid compiler")
|
||
case _:
|
||
raise ValueError("Invalid compiler")
|
||
|
||
# 启用命令记录
|
||
if args.record:
|
||
compiler.enable_recording()
|
||
|
||
# 创建构建器
|
||
package_path = Path.cwd() / Path(args.path)
|
||
builder = CPackageBuilder(package_path, compiler)
|
||
|
||
# 处理子命令
|
||
match args.command:
|
||
case "build":
|
||
builder.build(
|
||
[
|
||
TargetType.MAIN_EXECUTABLE,
|
||
TargetType.TEST_EXECUTABLE,
|
||
TargetType.STATIC_LIBRARY,
|
||
TargetType.SHARED_LIBRARY,
|
||
TargetType.EXECUTABLE,
|
||
]
|
||
)
|
||
case "run":
|
||
bin_path = builder.context.path.default_bin_path
|
||
if not bin_path.exists():
|
||
logger.error("%s not exist", bin_path)
|
||
return
|
||
builder.build([TargetType.MAIN_EXECUTABLE])
|
||
builder.run()
|
||
case "test":
|
||
builder.build([TargetType.TEST_EXECUTABLE])
|
||
builder.tests()
|
||
case "clean":
|
||
builder.clean()
|
||
case "tree":
|
||
builder.tree()
|
||
case _:
|
||
logger.error("unknown command: %s", args.command)
|
||
|
||
# 输出记录的命令(如果有)
|
||
if args.record:
|
||
cmds = compiler.get_recorded_commands()
|
||
logger.info("Recorded compiler commands: [len is %s]", len(cmds))
|
||
for cmd in cmds:
|
||
print(cmd)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# builder = CPackageBuilder(Path("./runtime/libcore/"), ClangCompiler())
|
||
# builder.build()
|
||
main()
|