"""cbuild.py""" from abc import ABC, abstractmethod import tomllib import pprint import subprocess from pathlib import Path from dataclasses import asdict, dataclass, field from enum import Enum, auto import logging import hashlib from graphlib import TopologicalSorter import shutil import argparse from typing import Self import time import sys import json class ColorFormatter(logging.Formatter): """颜色格式化""" # 基础颜色定义(对应ANSI颜色码) cyan = "\x1b[36m" # DEBUG - 青色 green = "\x1b[32m" # INFO - 绿色 blue = "\x1b[34m" # TRACE - 蓝色 yellow = "\x1b[33m" # WARN - 黄色 red = "\x1b[31m" # ERROR - 红色 line_red = "\x1b[31;4m" # FATAL - 红色加下划线 bold = "\x1b[1m" # 粗体 reset = "\x1b[0m" # 重置 fmt = "%(name)s - %(levelname)s: " time = "%(asctime)s" msg = "%(message)s" file = " - (%(filename)s:%(lineno)d)" FORMATS = { logging.DEBUG: bold + cyan + fmt + reset + msg + file, logging.INFO: bold + green + fmt + reset + msg, logging.WARNING: bold + yellow + fmt + reset + msg, logging.ERROR: bold + red + fmt + reset + msg, logging.CRITICAL: bold + line_red + fmt + reset + msg + file, logging.FATAL: bold + line_red + fmt + reset + msg + file, } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) def setup_logger() -> logging.Logger: """设置日志记录器""" _logger = logging.getLogger(__name__) console_handler = logging.StreamHandler() console_handler.setFormatter(ColorFormatter()) _logger.addHandler(console_handler) return _logger logger = setup_logger() @dataclass class Dependency: """依赖配置""" name: str path: str version: str = "0.0.0" optional: bool = False @dataclass class Feature: """特性配置""" name: str description: str = "" dependencies: list[str] = field(default_factory=list) class PackageConfig: """包配置类, 用于解析和管理cbuild.toml配置""" CONFIG_FILE = "cbuild.toml" def __init__(self, config_path: Path): package_file = config_path / self.CONFIG_FILE if not package_file.exists(): raise ValueError(f"Package path {package_file} does not exist") with open(package_file, "rb") as file: raw_config = tomllib.load(file) self.config: dict = raw_config.get("package", {}) self.path: Path = config_path def __str__(self) -> str: return pprint.pformat(self.config) @property def name(self) -> str: """获取包的名称 Returns: str: 包的名称,如果未定义则返回空字符串 """ return self.config.get("name", "[unnamed package]") @property def version(self) -> str: """获取包的版本 Returns: str: 包的版本号,如果未定义则返回空字符串 """ return self.config.get("version", "0.0.0") @property def default_features(self) -> list[str]: """获取默认启用的特性列表 Returns: list[str]: 默认特性的名称列表 """ return self.config.get("default_features", []) @property def features(self) -> list[Feature]: """获取所有可用特性及其依赖 Returns: list[Feature]: 特性对象列表 """ features_data = self.config.get("features", {}) features = [] for feature in features_data: if isinstance(feature, str): feature = Feature(name=feature) elif isinstance(feature, dict): name = feature.get("name", None) if name is None: continue feature = Feature( name=name, description=feature.get("description", ""), dependencies=feature.get("dependencies", []), ) features.append(feature) return features @property def dependencies(self) -> list[Dependency]: """获取包的依赖列表 Returns: list[Dependency]: 依赖对象列表 """ deps_data = self.config.get("dependencies", []) dependencies = [] for dep_dict in deps_data: if isinstance(dep_dict, dict): dependency = Dependency( name=dep_dict.get("name", ""), path=dep_dict.get("path", ""), version=dep_dict.get("version", "0.0.0"), optional=dep_dict.get("optional", False), ) dependencies.append(dependency) return dependencies @property def authors(self) -> list[str]: """获取作者列表 Returns: list[str]: 作者列表 """ return self.config.get("authors", []) @property def description(self) -> str: """获取包的描述 Returns: str: 包的描述文本 """ return self.config.get("description", "") class DependencyResolver: """依赖解析器""" def __init__(self, root_package: PackageConfig): self.root_package = root_package self.resolved_deps: dict[str, PackageConfig] = {} self.deps_graph: TopologicalSorter = TopologicalSorter() self._resolved = False def resolve(self) -> dict[str, PackageConfig]: """解析所有依赖""" if self._resolved: return self.resolved_deps # 使用广度优先搜索解析所有依赖 queue = [self.root_package] visited = set() while queue: pkg_config = queue.pop(0) pkg_name = pkg_config.name if pkg_name in visited: continue visited.add(pkg_name) # 创建构建上下文 if pkg_name not in self.resolved_deps: self.resolved_deps[pkg_name] = pkg_config # 解析直接依赖 for dep in pkg_config.dependencies: dep_path = Path(pkg_config.path / dep.path) dep_config = PackageConfig(dep_path) dep_name = dep_config.name # 添加依赖图关系 self.deps_graph.add(pkg_name, dep_name) # 如果是新依赖,加入队列继续解析 if dep_name not in visited and dep_name not in self.resolved_deps: queue.append(dep_config) self.deps_graph.prepare() self._resolved = True return self.resolved_deps def get_sorted_dependencies(self) -> list[CBuildContext]: """获取按拓扑排序的依赖列表(不包括根包)""" if not self._resolved: self.resolve() sorted_names = list(self.deps_graph.static_order()) return [ CBuildContext(self.resolved_deps[name]) for name in sorted_names if name != self.root_package.name and name in self.resolved_deps ] def get_all_contexts(self) -> list[CBuildContext]: """获取所有上下文(包括根包)""" if not self._resolved: self.resolve() return [ CBuildContext(resolved_dep) for resolved_dep in self.resolved_deps.values() ] @dataclass(frozen=True) class BuildPath: """path""" root_path: Path tests_path: Path output_path: Path object_path: Path src_path: Path inc_path: Path default_bin_path: Path default_lib_path: Path @classmethod def from_root_path(cls, root_path: Path) -> Self: """_summary_ Args: output_path (Path): _description_ Returns: Self: _description_ """ src_path = root_path / "src" output_path = root_path / "build" return cls( root_path=root_path, tests_path=root_path / "tests", inc_path=root_path / "include", output_path=output_path, object_path=output_path / "obj", src_path=src_path, default_bin_path=src_path / "main.c", default_lib_path=src_path / "lib.c", ) class TargetType(Enum): """目标文件类型枚举""" MAIN_EXECUTABLE = auto() EXECUTABLE = auto() TEST_EXECUTABLE = auto() STATIC_LIBRARY = auto() SHARED_LIBRARY = auto() @dataclass class Target: """目标文件信息""" name: str type: TargetType source: Path object: Path output: Path class CBuildContext: """构建上下文,管理所有包的信息""" def __init__( self, package: PackageConfig, build_path: BuildPath | None = None, need_cache: bool = True, ): self.package = package self.relative_path = Path.cwd() self.path = ( BuildPath.from_root_path(package.path) if build_path is None else build_path ) self.deps_resolver = DependencyResolver(package) def _path_collection(self, path: list[Path]) -> list[Path]: p = [str(p.resolve()) for p in path if p.exists()] unique_paths = set(p) return [ Path(p).relative_to(self.relative_path, walk_up=True) for p in unique_paths ] @property def sources_path(self) -> list[Path]: """获取所有包的源文件路径""" return list(self.path.src_path.glob("**/*.c")) @property def tests_path(self) -> list[Path]: """获取所有测试源文件路径 eg. `tests/test_*.c`""" test_sources = [] test_path = self.path.root_path / "tests" if test_path.exists(): for file in test_path.glob("test_*.c"): test_sources.append(file) return test_sources @property def includes(self) -> list[Path]: """获取包的包含路径""" deps = self.deps_resolver.get_all_contexts() includes = [inc.path.inc_path for inc in deps if inc.path.inc_path.exists()] return self._path_collection(includes) def get_targets(self) -> list[Target]: """获取所有构建目标""" targets = [] execute_ext = "" if sys.platform == "win32": execute_ext = ".exe" # 添加主可执行文件目标 if self.path.default_bin_path.exists(): targets.append( Target( name=self.package.name, type=TargetType.MAIN_EXECUTABLE, source=self.path.default_bin_path, object=self.get_object_path(self.path.default_bin_path), output=self.path.output_path / f"{self.package.name}{execute_ext}", ) ) # 添加静态库目标 if self.path.default_lib_path.exists(): targets.append( Target( name=f"lib{self.package.name}", type=TargetType.STATIC_LIBRARY, source=self.path.default_lib_path, object=self.get_object_path(self.path.default_lib_path), output=self.path.output_path / f"lib{self.package.name}.a", ) ) # 添加测试目标 if self.path.tests_path.exists(): for test_source in self.tests_path: targets.append( Target( name=test_source.stem, type=TargetType.TEST_EXECUTABLE, source=test_source, object=self.get_object_path(test_source), output=self.path.output_path / f"{test_source.stem}{execute_ext}", ) ) return targets def get_self_build_objs(self) -> list[Path]: """获取当前包的源文件路径""" objs = [] for path in self.sources_path: if path == self.path.default_bin_path: continue if path == self.path.default_lib_path: continue objs.append(path) return objs def get_object_path(self, source_path: Path) -> Path: """将源文件路径映射到对象文件路径 Args: source_path (Path): 源文件路径(.c文件) Returns: Path: 对应的对象文件路径(.o文件) """ # 确保输出目录存在 objects_dir = self.path.object_path objects_dir.mkdir(parents=True, exist_ok=True) try: # 尝试生成相对于根目录的路径结构,避免冲突 relative_path = source_path.relative_to(self.path.root_path) # 将路径分隔符替换为特殊字符,防止文件系统问题 safe_relative_path = str(relative_path).replace("/", "_").replace("\\", "_") object_path = objects_dir / Path(safe_relative_path).with_suffix(".o") except ValueError: # 如果源文件完全不在项目目录下,则使用完整路径哈希 full_path_str = str(source_path.absolute()) path_hash = hashlib.md5(full_path_str.encode()).hexdigest()[:8] object_filename = f"{source_path.stem}_{path_hash}{source_path.suffix}.o" object_path = objects_dir / object_filename # 确保对象文件的目录存在 object_path.parent.mkdir(parents=True, exist_ok=True) return object_path def get_build_objs(self) -> list[tuple[Path, Path]]: """获取所有需要编译的源文件及目标文件(排除主文件即main.c, lib.c, bin/*.c)""" deps = self.deps_resolver.get_all_contexts() objs = [] for dep in deps: objs.extend(dep.get_self_build_objs()) self._path_collection(objs) return [ (source_path, self.get_object_path(source_path)) for source_path in objs ] @dataclass class CacheEntry: """缓存条目""" mtime: float hash: str obj_path: str class BuildCache: """构建缓存管理器""" def __init__(self, build_path: Path): self.cache_dir = build_path / Path("cache") self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_file = self.cache_dir / Path("cache.json") self.cache: dict[str, CacheEntry] = {} def load(self): """加载缓存数据""" if not self.cache_file.exists() or self.cache_file.stat().st_size == 0: self.cache_file.touch() self.cache = {} return with self.cache_file.open("r", encoding="utf-8") as f: data: dict = json.load(f) self.cache = {key: CacheEntry(**entry_data) for key, entry_data in data.items()} def clear(self): """清空缓存数据""" self.cache.clear() def save(self): """保存缓存数据""" serializable_cache = {key: asdict(entry) for key, entry in self.cache.items()} with self.cache_file.open("w", encoding="utf-8") as f: json.dump(serializable_cache, f) def _calculate_file_hash(self, file_path: Path) -> str: """计算文件内容哈希""" hash_md5 = hashlib.md5() with open(file_path, "rb") as f: hashlib.file_digest(f, "md5") return hash_md5.hexdigest() def needs_rebuild(self, source_path: Path, object_path: Path) -> bool: """检查是否需要重新编译 TODO 头文件并不会被计算 """ source_key = str(source_path) # 如果没有缓存记录,需要重新编译 if source_key not in self.cache: return True cache_entry = self.cache[source_key] # 检查源文件是否更新 if source_path.stat().st_mtime > cache_entry.mtime: return True # 检查对象文件是否存在 if not object_path.exists(): return True if cache_entry.hash != self._calculate_file_hash(source_path): return True return False def update_cache(self, source_path: Path, object_path: Path): """更新缓存记录""" source_key = str(source_path) self.cache[source_key] = CacheEntry( mtime=source_path.stat().st_mtime, hash=self._calculate_file_hash(source_path), obj_path=str(object_path.absolute()), ) class CompilerBuildMode(Enum): """编译模式""" TEST = "test" DEV = "dev" DEBUG = "debug" RELEASE = "release" NONE = "none" class Compiler(ABC): """编译器抽象类""" def get_default_flags(self, mode: CompilerBuildMode) -> list[str]: """获取指定模式的默认标志""" logger.debug("get default flags for mode: %s is not supported", mode.name) return [] @abstractmethod def compile( self, sources: Path, output: Path, includes: list[Path], flags: list[str] ): """编译源文件""" @abstractmethod def link( self, objects: list[Path], libraries: list[str], flags: list[str], output: Path ): """链接对象文件""" def cmd(self, cmd: str | list[str]): """执行命令并处理错误输出""" try: logger.debug( "command: `%s`", " ".join(cmd) if isinstance(cmd, list) else cmd ) subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: # 输出详细错误信息帮助调试 cmd_str = " ".join(e.cmd) if isinstance(e.cmd, list) else e.cmd logger.fatal("command running error: [%d]`%s`", e.returncode, cmd_str) raise class ClangCompiler(Compiler): """Clang编译器""" def compile( self, sources: Path, output: Path, includes: list[Path], flags: list[str] ): """编译源文件""" cmd = ["clang"] cmd.extend(flags) cmd.extend(["-c", str(sources), "-o", str(output)]) cmd.extend(f"-I{inc}" for inc in includes) self.cmd(cmd) def link( self, objects: list[Path], libraries: list[str], flags: list[str], output: Path ): """链接对象文件""" cmd = ["clang"] cmd.extend(flags) cmd.extend(["-o", str(output)]) cmd.extend(str(obj) for obj in objects) cmd.extend(lib for lib in libraries) self.cmd(cmd) class GccCompiler(Compiler): """Gcc编译器""" def __init__(self) -> None: super().__init__() self.default_flags = { CompilerBuildMode.NONE: [], CompilerBuildMode.TEST: [ "-DTEST_MODE", "-O2", "-g", "--coverage", "-Wall", "-Wextra", ], CompilerBuildMode.DEV: ["-DDEV_MODE", "-O0", "-g", "-Wall", "-Wextra"], CompilerBuildMode.DEBUG: [ "-DDEBUG_MODE", "-O0", "-g", "-fsanitize=address", "-fsanitize=undefined", "-fno-omit-frame-pointer", "-Wall", "-Wextra", "-Werror", ], CompilerBuildMode.RELEASE: ["-O2", "-flto"], } def get_default_flags(self, mode: CompilerBuildMode) -> list[str]: """获取指定模式的默认标志""" return self.default_flags[mode] def compile( self, sources: Path, output: Path, includes: list[Path], flags: list[str] ): """编译源文件""" cmd = ["gcc"] cmd.extend(flags) cmd.extend(["-c", str(sources), "-o", str(output)]) cmd.extend(f"-I{inc}" for inc in includes) self.cmd(cmd) def link( self, objects: list[Path], libraries: list[str], flags: list[str], output: Path ): """链接对象文件""" cmd = ["gcc"] cmd.extend(flags) cmd.extend(["-o", str(output)]) cmd.extend(str(obj) for obj in objects) cmd.extend(lib for lib in libraries) self.cmd(cmd) class CPackageBuilder: """包构建器""" def __init__( self, package_path: Path, compiler: Compiler, mode: CompilerBuildMode = CompilerBuildMode.DEV, need_cache: bool = True, ): self.package = PackageConfig(package_path) self.context = CBuildContext(self.package, None) self.compiler: Compiler = compiler # FIXME hack context self.cache = BuildCache(self.context.path.output_path) if need_cache else None self.global_flags = self.compiler.get_default_flags(mode) def _compile( self, sources: Path, output: Path, includes: list[Path], flags: list[str] ) -> None: """缓存编译""" if self.cache is not None and self.cache.needs_rebuild(sources, output): self.compiler.compile(sources, output, includes, flags) self.cache.update_cache(sources, output) else: logger.debug("Skipping %s", sources) def _build_ctx(self) -> list[Path]: """构建上下文""" # 确保输出目录存在 self.context.path.output_path.mkdir(parents=True, exist_ok=True) self.context.path.object_path.mkdir(parents=True, exist_ok=True) path_map = self.context.get_build_objs() for src, obj in path_map: self._compile(src, obj, self.context.includes, self.global_flags) return [pair[1] for pair in path_map] def _format_size(self, size_bytes): """格式化文件大小显示""" if size_bytes == 0: return "0B" size_names = ["B", "KB", "MB", "GB"] i = 0 while size_bytes >= 1024 and i < len(size_names) - 1: size_bytes /= 1024.0 i += 1 if i == 0: return f"{int(size_bytes)}{size_names[i]}" return f"{size_bytes:.1f}{size_names[i]}" def build(self, targets_type: list[TargetType]): """构建包""" start_time = time.time() if self.cache is not None: self.cache.load() object_files = self._build_ctx() targets = self.context.get_targets() for target in targets: if target.type not in targets_type: continue match target.type: case TargetType.MAIN_EXECUTABLE: self._compile( target.source, target.object, self.context.includes, self.global_flags, ) object_files.append(target.object) self.compiler.link( object_files, [], self.global_flags, target.output ) object_files.remove(target.object) case TargetType.TEST_EXECUTABLE: self._compile( target.source, target.object, self.context.includes, self.global_flags, ) object_files.append(target.object) self.compiler.link( object_files, [], self.global_flags, target.output ) object_files.remove(target.object) if self.cache is not None: self.cache.save() # 计算构建时间 elapsed_time = time.time() - start_time time_str = ( f"{elapsed_time:.2f}s" if elapsed_time < 60 else f"{elapsed_time / 60:.1f}m" ) # 显示完成信息 logger.info( "\tFinished dev [unoptimized + debuginfo] target(s) in %s", time_str ) def run(self): """运行项目""" targets = [ target for target in self.context.get_targets() if target.type == TargetType.MAIN_EXECUTABLE ] if len(targets) != 1: logger.error("not have target to run") subprocess.run(targets[0].output, check=False) def clean(self): """清理构建产物""" # 统计要删除的文件数量和总大小 cleaned_files = 0 total_size = 0 # 删除整个输出目录,这会清除所有构建产物 if self.context.path.output_path.exists(): # 遍历目录统计文件数量和大小 for file_path in self.context.path.output_path.rglob("*"): if file_path.is_file(): cleaned_files += 1 total_size += file_path.stat().st_size shutil.rmtree(self.context.path.output_path) logger.info("已清理构建目录: %s", self.context.path.output_path) logger.info( "Cleaned %s files, Size %s", cleaned_files, self._format_size(total_size), ) else: logger.info("没有找到构建目录,无需清理") def tree(self): """打印构建树 - 仿照 cargo 的风格""" # TODO 递归显示 print("dependency tree:") # 解析依赖 resolver = self.context.deps_resolver resolver.resolve() # 获取根包 root_pkg = resolver.root_package print(f"{root_pkg.name} v{root_pkg.version}") # 获取依赖图的节点和边 # 这里我们简化处理,只显示直接依赖 for pkg_config in resolver.resolved_deps.values(): if pkg_config.name != root_pkg.name: # 跳过根包 indent = ( "├── " if pkg_config.name != list(resolver.resolved_deps.keys())[-1] else "└── " ) print(f"{indent}{pkg_config.name} v{pkg_config.version}") def tests(self): """运行测试""" targets = [ target for target in self.context.get_targets() if target.type == TargetType.TEST_EXECUTABLE ] passed = 0 failed = 0 for target in targets: name = target.name logger.info("运行测试: %s", name) logger.debug("test run %s", target.output) try: result = subprocess.run( target.output, check=True, # capture_output=True, # text=True, timeout=30, ) if result.returncode == 0: print(f" ✓ 测试 {name} 通过") passed += 1 else: print(f" ✗ 测试 {name} 失败") if result.stdout: print(f" 输出: {result.stdout}") if result.stderr: print(f" 错误: {result.stderr}") failed += 1 except subprocess.TimeoutExpired: print(f" ✗ 测试 {name} 超时") failed += 1 except subprocess.SubprocessError as e: print(f" ✗ 测试 {name} 运行异常: {e}") failed += 1 print(f"\n测试结果: {passed} 通过, {failed} 失败") def cmd(self, cmd: str): """执行命令""" match cmd: case "build": self.build( [ TargetType.MAIN_EXECUTABLE, TargetType.TEST_EXECUTABLE, TargetType.STATIC_LIBRARY, TargetType.SHARED_LIBRARY, TargetType.EXECUTABLE, ] ) case "run": bin_path = self.context.path.default_bin_path if not bin_path.exists(): logger.error("%s not exist", bin_path) return self.build([TargetType.MAIN_EXECUTABLE]) self.run() case "test": self.build([TargetType.TEST_EXECUTABLE]) self.tests() case "clean": self.clean() case "tree": self.tree() case _: logger.error("unknown command: %s", cmd) def main(): """main""" parser = argparse.ArgumentParser( description="Simple C Package Manager", prog="cbuild" ) parser.add_argument( "command", choices=["build", "run", "test", "clean", "tree"], help="Command to execute", ) parser.add_argument( "--compiler", "-c", choices=["gcc", "clang", "smcc"], default="gcc", help="Compiler to use (default: gcc)", ) parser.add_argument( "--verbose", "-V", action="store_true", help="enable the logging to debug (default: false)", ) parser.add_argument( "--path", "-p", default=".", help="Path to the package (default: current directory)", ) args = parser.parse_args() package_path = Path.cwd() / Path(args.path) if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) match args.compiler: case "gcc": compiler = GccCompiler() case "clang": compiler = ClangCompiler() case "smcc": # TODO self compiler raise ValueError("Invalid compiler") case _: raise ValueError("Invalid compiler") builder = CPackageBuilder(package_path, compiler) builder.cmd(args.command) if __name__ == "__main__": # builder = CPackageBuilder(Path("./runtime/libcore/"), ClangCompiler()) # builder.build() main()