diff --git a/main.py b/main.py index 31d3222..d90eb08 100644 --- a/main.py +++ b/main.py @@ -1,34 +1,24 @@ """main""" -import subprocess import argparse from pathlib import Path import yaml -from src.logger import get_logger +from src.certbot import CertbotConfigurator, parse_certbot, register_certbot +from src.logger import get_logger, run_cmd_with_log from src.nginx import NginxConfig, NginxConfigurator logger = get_logger(__name__) # 配置常量 FILE_PATH = Path(__file__).parent -DOCKER_COMPOSE_FILE = Path(FILE_PATH, "docker-compose.yml") NETWORK_NAME = "nginx-net" def validate_docker_network(): """检查Docker网络是否存在""" - try: - result = subprocess.run( - ["docker", "network", "inspect", NETWORK_NAME], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - check=True - ) - return result.returncode == 0 - except subprocess.CalledProcessError: + result = run_cmd_with_log(["docker", "network", "inspect", NETWORK_NAME], logger) + if result.returncode != 0: logger.error("Docker网络 %s 不存在", NETWORK_NAME) return False - except Exception as e: - logger.error("网络检查异常: %s", str(e), exc_info=True) - return False + return True def parse_compose_config(file_path: Path) -> list[NginxConfig]: """解析Docker Compose文件""" @@ -83,9 +73,6 @@ def parse_compose_config(file_path: Path) -> list[NginxConfig]: except yaml.YAMLError as e: logger.error("YAML解析错误: %s", str(e)) return False - except Exception as e: - logger.error("未知错误: %s", str(e), exc_info=True) - return False def parse_yaml_config(file_path: Path) -> tuple[list[NginxConfig], list[NginxConfig]]: """解析conf.yml配置文件""" @@ -126,7 +113,10 @@ def main(): help='只生成配置不实际写入') parser.add_argument('--verbose', action='store_true', help='显示详细调试信息') + + register_certbot(parser) args = parser.parse_args() + parse_certbot(args) # 初始化配置器 conf = NginxConfigurator() diff --git a/src/certbot.py b/src/certbot.py new file mode 100644 index 0000000..e0b0f13 --- /dev/null +++ b/src/certbot.py @@ -0,0 +1,73 @@ +""" +Let's Encrypt 证书申请 +""" +from argparse import ArgumentParser, Namespace +from encodings.punycode import T +import sys +from src.logger import get_logger, run_cmd_with_log +logger = get_logger(__name__) + +class CertbotConfigurator: + """ + Let's Encrypt 证书申请 + """ + def __init__(self, + domain: str = "*.zzyxyz.com", + acme_server: str = "https://acme-v02.api.letsencrypt.org/directory", + hook_auth_script: str = "./dns-hook.sh add", + hook_cleanup_script: str = "./dns-hook.sh remove"): + self.domain = domain + self.acme_server = acme_server + self.hook_auth_script = hook_auth_script + self.hook_cleanup_script = hook_cleanup_script + @staticmethod + def check_certbot() -> bool: + """增强版环境检查""" + try: + # 检查安装和权限 + result = run_cmd_with_log(["certbot", "--version"], logger) + return "certbot" in result.stdout.decode().lower() + except (FileNotFoundError, PermissionError) as e: + logger.error("Certbot 依赖检查失败, 可能需要检查文件是否存在或是否有可执行权限: %s", str(e)) + return False + def run(self): + """runing certbot to get cert""" + # self.check_certbot() + cmd = [ + "certbot", + "certonly", + "--manual", + "--preferred-challenges", "dns", + "--server", self.acme_server, + "--manual-auth-hook", self.hook_auth_script, + "--manual-cleanup-hook", self.hook_cleanup_script, + "-d", self.domain + ] + # 过滤空字符串 + result = run_cmd_with_log([x for x in cmd if x], logger) + if result.returncode != 0: + logger.error("Certbot certonly failed %s", result.stderr) + return False + logger.info("Certbot certonly success") + return True + +def handle_certbot(args): + """证书管理核心逻辑""" + conf = CertbotConfigurator() + if args.run: + conf.run() + return True + +def parse_certbot(args: Namespace): + """parse subcommand args""" + if args.certbot: + args.handler(args) + sys.exit(0) + +def register_certbot(parsers: ArgumentParser): + """注册certbot子命令""" + subparsers = parsers.add_subparsers(dest='certbot', required=False) + parser = subparsers.add_parser('certbot', help='证书管理') + parser.add_argument('--run', action='store_true', help='运行证书申请程序') + parser.add_argument('--domain', required=False, help='目标域名') + parser.set_defaults(handler=handle_certbot) # 直接绑定处理函数 diff --git a/src/logger.py b/src/logger.py index 72db0ff..e512cf9 100644 --- a/src/logger.py +++ b/src/logger.py @@ -2,6 +2,7 @@ 统一日志工具 """ import logging +import subprocess class ColorFormatter(logging.Formatter): """ @@ -32,3 +33,29 @@ def get_logger(name: str | None = None): datefmt='%Y-%m-%d %H:%M:%S')) logger.addHandler(handler) return logger + +def run_cmd_with_log(cmd: list[str], logger: logging.Logger) -> subprocess.CompletedProcess[bytes]: + """ + run cmd with log + """ + logger.debug(f"Running command: {cmd}") + try: + result = subprocess.run( + cmd, + capture_output=True, + stdout=None, + stderr=None, + check=False, + text=True, + timeout=3, + ) + except (subprocess.SubprocessError, FileNotFoundError) as e: + logger.debug(f"Command failed: {e}") + return subprocess.CompletedProcess(cmd, returncode=-1, stdout='', stderr=str(e)) + if result.returncode != 0: + logger.debug(f""" + Command failed with exit code {result.returncode} + stdout: {result.stdout} + stderr: {result.stderr} + """.strip()) + return result diff --git a/src/nginx.py b/src/nginx.py index 35ea6fe..6dcd026 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -3,18 +3,12 @@ nginx 配置生成工具 """ from dataclasses import asdict, dataclass from datetime import datetime -import logging from pathlib import Path import shutil -import subprocess -from src.logger import get_logger +from src.logger import get_logger, run_cmd_with_log logger = get_logger(__name__) -NGINX_CONF_DIR = Path("nginx/conf/conf.d") -BACKUP_DIR = Path("nginx/conf/backups") -CONTAINER_NAME = "nginx" - @dataclass class NginxConfig: """ @@ -45,7 +39,6 @@ class NginxConfig: proxy_read_timeout: str = "60s" web_socket_proxy: bool = False -@dataclass class NginxConfigurator: """ Nginx配置管理类,封装配置生成、备份、重载等操作 @@ -57,9 +50,11 @@ class NginxConfigurator: logger (logging.Logger): 日志记录器 """ - output_dir: Path = NGINX_CONF_DIR - backup_dir: Path = BACKUP_DIR - logger: logging.Logger = logger + def __init__(self): + self.output_dir: Path = Path("nginx/conf/conf.d") + self.backup_dir: Path = Path("nginx/conf/backups") + self.container_name: str = "nginx" + self.logger = logger def gen_all_config(self, configs: list[NginxConfig]) -> str: """生成Nginx配置内容""" @@ -123,7 +118,7 @@ server {{ shutil.copy(self.output_dir / filename, backup_file) self.logger.info("配置已备份至 %s", backup_file) return backup_file - except Exception as e: + except (OSError, shutil.Error) as e: self.logger.error("备份失败: %s", str(e)) return None @@ -139,8 +134,8 @@ server {{ f.write(res) logger.info("配置已写入 %s", output_file) return True - except Exception as e: - self.logger.error(f"生成文件失败: {str(e)}") + except (OSError, shutil.Error) as e: + self.logger.error("生成文件失败: %s", str(e)) return False def safe_reload(self) -> bool: @@ -151,32 +146,25 @@ server {{ def test_config(self) -> bool: """测试Nginx配置""" - result = subprocess.run( - ["docker", "exec", CONTAINER_NAME, "nginx", "-t"], - capture_output=True, stderr=None, stdout=None, check=False + result = run_cmd_with_log( + ["docker", "exec", self.container_name, "nginx", "-t"], + self.logger ) if result.returncode != 0: self.logger.error("配置测试失败: %s", result.stderr) return False - else: - return True + return True def reload(self) -> bool: """执行配置重载""" - try: - result = subprocess.run( - ["docker", "exec", CONTAINER_NAME, "nginx", "-s", "reload"], - capture_output=True, stderr=None, stdout=None, check=False - ) - if result.returncode != 0: - self.logger.error("配置重载失败: %s", result.stderr) - return False - else: - self.logger.info("配置重载成功") - return True - except subprocess.CalledProcessError as e: - self.logger.error("重载失败: %s", str(e)) + result = run_cmd_with_log( + ["docker", "exec", self.container_name, "nginx", "-s", "reload"], + self.logger) + if result.returncode != 0: + self.logger.error("配置重载失败: %s", result.stderr) return False + self.logger.info("配置重载成功") + return True def rollback(self) -> bool: """回滚到最近的有效配置"""