refactor(main): 重构主要逻辑并添加 Certbot 支持

- 移除 Docker 网络检查逻辑
- 添加 Certbot 配置和注册
- 优化错误处理和日志记录
- 重构 Nginx 配置生成和重载逻辑
This commit is contained in:
ZZY 2025-05-25 16:12:41 +08:00
parent 07d5f3c0ab
commit dc052329cb
4 changed files with 128 additions and 50 deletions

26
main.py
View File

@ -1,34 +1,24 @@
"""main""" """main"""
import subprocess
import argparse import argparse
from pathlib import Path from pathlib import Path
import yaml import yaml
from src.logger import get_logger from src.certbot import CertbotConfigurator, parse_certbot, register_certbot
from src.logger import get_logger, run_cmd_with_log
from src.nginx import NginxConfig, NginxConfigurator from src.nginx import NginxConfig, NginxConfigurator
logger = get_logger(__name__) logger = get_logger(__name__)
# 配置常量 # 配置常量
FILE_PATH = Path(__file__).parent FILE_PATH = Path(__file__).parent
DOCKER_COMPOSE_FILE = Path(FILE_PATH, "docker-compose.yml")
NETWORK_NAME = "nginx-net" NETWORK_NAME = "nginx-net"
def validate_docker_network(): def validate_docker_network():
"""检查Docker网络是否存在""" """检查Docker网络是否存在"""
try: result = run_cmd_with_log(["docker", "network", "inspect", NETWORK_NAME], logger)
result = subprocess.run( if result.returncode != 0:
["docker", "network", "inspect", NETWORK_NAME],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True
)
return result.returncode == 0
except subprocess.CalledProcessError:
logger.error("Docker网络 %s 不存在", NETWORK_NAME) logger.error("Docker网络 %s 不存在", NETWORK_NAME)
return False return False
except Exception as e: return True
logger.error("网络检查异常: %s", str(e), exc_info=True)
return False
def parse_compose_config(file_path: Path) -> list[NginxConfig]: def parse_compose_config(file_path: Path) -> list[NginxConfig]:
"""解析Docker Compose文件""" """解析Docker Compose文件"""
@ -83,9 +73,6 @@ def parse_compose_config(file_path: Path) -> list[NginxConfig]:
except yaml.YAMLError as e: except yaml.YAMLError as e:
logger.error("YAML解析错误: %s", str(e)) logger.error("YAML解析错误: %s", str(e))
return False return False
except Exception as e:
logger.error("未知错误: %s", str(e), exc_info=True)
return False
def parse_yaml_config(file_path: Path) -> tuple[list[NginxConfig], list[NginxConfig]]: def parse_yaml_config(file_path: Path) -> tuple[list[NginxConfig], list[NginxConfig]]:
"""解析conf.yml配置文件""" """解析conf.yml配置文件"""
@ -126,7 +113,10 @@ def main():
help='只生成配置不实际写入') help='只生成配置不实际写入')
parser.add_argument('--verbose', action='store_true', parser.add_argument('--verbose', action='store_true',
help='显示详细调试信息') help='显示详细调试信息')
register_certbot(parser)
args = parser.parse_args() args = parser.parse_args()
parse_certbot(args)
# 初始化配置器 # 初始化配置器
conf = NginxConfigurator() conf = NginxConfigurator()

73
src/certbot.py Normal file
View File

@ -0,0 +1,73 @@
"""
Let's Encrypt 证书申请
"""
from argparse import ArgumentParser, Namespace
from encodings.punycode import T
import sys
from src.logger import get_logger, run_cmd_with_log
logger = get_logger(__name__)
class CertbotConfigurator:
"""
Let's Encrypt 证书申请
"""
def __init__(self,
domain: str = "*.zzyxyz.com",
acme_server: str = "https://acme-v02.api.letsencrypt.org/directory",
hook_auth_script: str = "./dns-hook.sh add",
hook_cleanup_script: str = "./dns-hook.sh remove"):
self.domain = domain
self.acme_server = acme_server
self.hook_auth_script = hook_auth_script
self.hook_cleanup_script = hook_cleanup_script
@staticmethod
def check_certbot() -> bool:
"""增强版环境检查"""
try:
# 检查安装和权限
result = run_cmd_with_log(["certbot", "--version"], logger)
return "certbot" in result.stdout.decode().lower()
except (FileNotFoundError, PermissionError) as e:
logger.error("Certbot 依赖检查失败, 可能需要检查文件是否存在或是否有可执行权限: %s", str(e))
return False
def run(self):
"""runing certbot to get cert"""
# self.check_certbot()
cmd = [
"certbot",
"certonly",
"--manual",
"--preferred-challenges", "dns",
"--server", self.acme_server,
"--manual-auth-hook", self.hook_auth_script,
"--manual-cleanup-hook", self.hook_cleanup_script,
"-d", self.domain
]
# 过滤空字符串
result = run_cmd_with_log([x for x in cmd if x], logger)
if result.returncode != 0:
logger.error("Certbot certonly failed %s", result.stderr)
return False
logger.info("Certbot certonly success")
return True
def handle_certbot(args):
"""证书管理核心逻辑"""
conf = CertbotConfigurator()
if args.run:
conf.run()
return True
def parse_certbot(args: Namespace):
"""parse subcommand args"""
if args.certbot:
args.handler(args)
sys.exit(0)
def register_certbot(parsers: ArgumentParser):
"""注册certbot子命令"""
subparsers = parsers.add_subparsers(dest='certbot', required=False)
parser = subparsers.add_parser('certbot', help='证书管理')
parser.add_argument('--run', action='store_true', help='运行证书申请程序')
parser.add_argument('--domain', required=False, help='目标域名')
parser.set_defaults(handler=handle_certbot) # 直接绑定处理函数

View File

@ -2,6 +2,7 @@
统一日志工具 统一日志工具
""" """
import logging import logging
import subprocess
class ColorFormatter(logging.Formatter): class ColorFormatter(logging.Formatter):
""" """
@ -32,3 +33,29 @@ def get_logger(name: str | None = None):
datefmt='%Y-%m-%d %H:%M:%S')) datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(handler) logger.addHandler(handler)
return logger return logger
def run_cmd_with_log(cmd: list[str], logger: logging.Logger) -> subprocess.CompletedProcess[bytes]:
"""
run cmd with log
"""
logger.debug(f"Running command: {cmd}")
try:
result = subprocess.run(
cmd,
capture_output=True,
stdout=None,
stderr=None,
check=False,
text=True,
timeout=3,
)
except (subprocess.SubprocessError, FileNotFoundError) as e:
logger.debug(f"Command failed: {e}")
return subprocess.CompletedProcess(cmd, returncode=-1, stdout='', stderr=str(e))
if result.returncode != 0:
logger.debug(f"""
Command failed with exit code {result.returncode}
stdout: {result.stdout}
stderr: {result.stderr}
""".strip())
return result

View File

@ -3,18 +3,12 @@ nginx 配置生成工具
""" """
from dataclasses import asdict, dataclass from dataclasses import asdict, dataclass
from datetime import datetime from datetime import datetime
import logging
from pathlib import Path from pathlib import Path
import shutil import shutil
import subprocess
from src.logger import get_logger from src.logger import get_logger, run_cmd_with_log
logger = get_logger(__name__) logger = get_logger(__name__)
NGINX_CONF_DIR = Path("nginx/conf/conf.d")
BACKUP_DIR = Path("nginx/conf/backups")
CONTAINER_NAME = "nginx"
@dataclass @dataclass
class NginxConfig: class NginxConfig:
""" """
@ -45,7 +39,6 @@ class NginxConfig:
proxy_read_timeout: str = "60s" proxy_read_timeout: str = "60s"
web_socket_proxy: bool = False web_socket_proxy: bool = False
@dataclass
class NginxConfigurator: class NginxConfigurator:
""" """
Nginx配置管理类封装配置生成备份重载等操作 Nginx配置管理类封装配置生成备份重载等操作
@ -57,9 +50,11 @@ class NginxConfigurator:
logger (logging.Logger): 日志记录器 logger (logging.Logger): 日志记录器
""" """
output_dir: Path = NGINX_CONF_DIR def __init__(self):
backup_dir: Path = BACKUP_DIR self.output_dir: Path = Path("nginx/conf/conf.d")
logger: logging.Logger = logger self.backup_dir: Path = Path("nginx/conf/backups")
self.container_name: str = "nginx"
self.logger = logger
def gen_all_config(self, configs: list[NginxConfig]) -> str: def gen_all_config(self, configs: list[NginxConfig]) -> str:
"""生成Nginx配置内容""" """生成Nginx配置内容"""
@ -123,7 +118,7 @@ server {{
shutil.copy(self.output_dir / filename, backup_file) shutil.copy(self.output_dir / filename, backup_file)
self.logger.info("配置已备份至 %s", backup_file) self.logger.info("配置已备份至 %s", backup_file)
return backup_file return backup_file
except Exception as e: except (OSError, shutil.Error) as e:
self.logger.error("备份失败: %s", str(e)) self.logger.error("备份失败: %s", str(e))
return None return None
@ -139,8 +134,8 @@ server {{
f.write(res) f.write(res)
logger.info("配置已写入 %s", output_file) logger.info("配置已写入 %s", output_file)
return True return True
except Exception as e: except (OSError, shutil.Error) as e:
self.logger.error(f"生成文件失败: {str(e)}") self.logger.error("生成文件失败: %s", str(e))
return False return False
def safe_reload(self) -> bool: def safe_reload(self) -> bool:
@ -151,32 +146,25 @@ server {{
def test_config(self) -> bool: def test_config(self) -> bool:
"""测试Nginx配置""" """测试Nginx配置"""
result = subprocess.run( result = run_cmd_with_log(
["docker", "exec", CONTAINER_NAME, "nginx", "-t"], ["docker", "exec", self.container_name, "nginx", "-t"],
capture_output=True, stderr=None, stdout=None, check=False self.logger
) )
if result.returncode != 0: if result.returncode != 0:
self.logger.error("配置测试失败: %s", result.stderr) self.logger.error("配置测试失败: %s", result.stderr)
return False return False
else: return True
return True
def reload(self) -> bool: def reload(self) -> bool:
"""执行配置重载""" """执行配置重载"""
try: result = run_cmd_with_log(
result = subprocess.run( ["docker", "exec", self.container_name, "nginx", "-s", "reload"],
["docker", "exec", CONTAINER_NAME, "nginx", "-s", "reload"], self.logger)
capture_output=True, stderr=None, stdout=None, check=False if result.returncode != 0:
) self.logger.error("配置重载失败: %s", result.stderr)
if result.returncode != 0:
self.logger.error("配置重载失败: %s", result.stderr)
return False
else:
self.logger.info("配置重载成功")
return True
except subprocess.CalledProcessError as e:
self.logger.error("重载失败: %s", str(e))
return False return False
self.logger.info("配置重载成功")
return True
def rollback(self) -> bool: def rollback(self) -> bool:
"""回滚到最近的有效配置""" """回滚到最近的有效配置"""