Files
wjh/init_sqlite.py
zzy 32090ff06e feat: 添加SQLite数据库支持并迁移数据转换脚本
- 新增init_sqlite.py脚本,用于将MySQL转储文件转换为SQLite数据库
- 实现MySQL到SQLite的语法转换,包括数据类型映射、表结构清理等功能
- 在pom.xml中替换MySQL驱动为SQLite驱动,并添加JDBC依赖
- 更新application.yml配置文件,切换数据库连接从MySQL到SQLite
- 配置HikariCP连接池参数以适配SQLite数据库特性
2026-05-24 01:01:59 +08:00

125 lines
4.6 KiB
Python

"""Convert MySQL dump to SQLite database using executescript."""
import re
import sqlite3
import os
MYSQL_DUMP = os.path.join(os.path.dirname(__file__), '..', 'psychologicaltreatment.sql')
MYSQL_DUMP = os.path.normpath(MYSQL_DUMP)
DB_PATH = os.path.join(os.path.dirname(__file__), 'psychologicaltreatment.db')
def preprocess(sql):
"""Remove MySQL-specific syntax, returns SQLite-compatible SQL."""
# Remove lines that are MySQL meta commands
lines = []
for line in sql.splitlines():
stripped = line.strip().upper()
if any(stripped.startswith(x) for x in
['SET NAMES', 'SET FOREIGN_KEY_CHECKS', 'SET SQL_MODE',
'SET TIME_ZONE', 'SET CHARACTER_SET']):
continue
lines.append(line)
sql = '\n'.join(lines)
# Process each CREATE TABLE separately
# Strategy: find each CREATE TABLE block, clean it, replace it
def clean_create(match):
block = match.group(0)
# Remove table-level options: ENGINE=..., AUTO_INCREMENT=N, etc.
block = re.sub(r'\)\s*ENGINE.*?;', ');', block, flags=re.IGNORECASE | re.DOTALL)
# Remove inline CHARACTER SET / COLLATE (column-level)
block = re.sub(r'\s*CHARACTER\s+SET\s+\S+(?:\s+COLLATE\s+\S+)?', '', block, flags=re.IGNORECASE)
block = re.sub(r'\s+COLLATE\s+\S+', '', block, flags=re.IGNORECASE)
# Remove inline COMMENT '...'
block = re.sub(r"\s+COMMENT\s+'[^']*'", '', block, flags=re.IGNORECASE)
# Remove USING BTREE
block = re.sub(r'\s+USING\s+BTREE', '', block, flags=re.IGNORECASE)
# Remove ON UPDATE CURRENT_TIMESTAMP
block = re.sub(r'\s+ON\s+UPDATE\s+CURRENT_TIMESTAMP\s*(?:\([^)]*\))?', '', block, flags=re.IGNORECASE)
# MySQL types -> SQLite
block = re.sub(r'\b(tinyint|smallint|mediumint|bigint|int)\s*\(\s*\d+\s*\)', 'INTEGER', block, flags=re.IGNORECASE)
block = re.sub(r'\b(tinyint|smallint|mediumint|bigint|int)\b', 'INTEGER', block, flags=re.IGNORECASE)
block = re.sub(r'\bvarchar\s*\(\s*\d+\s*\)', 'TEXT', block, flags=re.IGNORECASE)
block = re.sub(r'\bdatetime\s*(?:\([^)]*\))?', 'TEXT', block, flags=re.IGNORECASE)
block = re.sub(r'\bdate\b', 'TEXT', block, flags=re.IGNORECASE)
block = re.sub(r'\b(tinytext|mediumtext|longtext)\b', 'TEXT', block, flags=re.IGNORECASE)
# AUTO_INCREMENT -> AUTOINCREMENT as INTEGER PRIMARY KEY
block = re.sub(
r'(`\w+`)\s+INTEGER\s+NOT\s+NULL\s+AUTO_INCREMENT',
r'\1 INTEGER PRIMARY KEY AUTOINCREMENT',
block, flags=re.IGNORECASE
)
block = re.sub(r'\bAUTO_INCREMENT\b', 'AUTOINCREMENT', block, flags=re.IGNORECASE)
if 'AUTOINCREMENT' in block:
block = re.sub(r'\s*,\s*PRIMARY\s+KEY\s*\(`\w+`\)', '', block, flags=re.IGNORECASE)
# Remove standalone INDEX/KEY lines
lines2 = block.splitlines()
result2 = []
for ln in lines2:
s = ln.strip()
if re.match(r'^\s*(INDEX|KEY)\s+', s, re.IGNORECASE) and \
not re.match(r'^\s*(PRIMARY|UNIQUE)\s', s, re.IGNORECASE):
# Remove trailing comma from previous line
if result2 and result2[-1].strip().endswith(','):
result2[-1] = result2[-1].rstrip(',')
continue
result2.append(ln)
block = '\n'.join(result2)
# Remove trailing comma before )
block = re.sub(r',\s*\)', '\n)', block)
return block
# Apply clean_create to all CREATE TABLE blocks
sql = re.sub(r'CREATE\s+TABLE.*?;', clean_create, sql, flags=re.IGNORECASE | re.DOTALL)
return sql
def main():
with open(MYSQL_DUMP, 'r', encoding='utf-8') as f:
raw = f.read()
sqlite_sql = preprocess(raw)
if os.path.exists(DB_PATH):
os.remove(DB_PATH)
conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA foreign_keys=OFF;")
# Use executescript which handles statement splitting properly
try:
conn.executescript(sqlite_sql)
except sqlite3.Error as e:
print(f"Error during executescript: {e}")
conn.commit()
# Verify
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [row[0] for row in cursor.fetchall()]
print(f"Database: {DB_PATH}")
print(f"Tables created: {len(tables)}")
for t in tables:
if t == 'sqlite_sequence':
continue
count = conn.execute(f'SELECT COUNT(*) FROM "{t}"').fetchone()[0]
print(f" {t}: {count} rows")
conn.close()
if __name__ == '__main__':
main()