"""Convert MySQL dump to SQLite database using executescript.""" import re import sqlite3 import os MYSQL_DUMP = os.path.join(os.path.dirname(__file__), '..', 'psychologicaltreatment.sql') MYSQL_DUMP = os.path.normpath(MYSQL_DUMP) DB_PATH = os.path.join(os.path.dirname(__file__), 'psychologicaltreatment.db') def preprocess(sql): """Remove MySQL-specific syntax, returns SQLite-compatible SQL.""" # Remove lines that are MySQL meta commands lines = [] for line in sql.splitlines(): stripped = line.strip().upper() if any(stripped.startswith(x) for x in ['SET NAMES', 'SET FOREIGN_KEY_CHECKS', 'SET SQL_MODE', 'SET TIME_ZONE', 'SET CHARACTER_SET']): continue lines.append(line) sql = '\n'.join(lines) # Process each CREATE TABLE separately # Strategy: find each CREATE TABLE block, clean it, replace it def clean_create(match): block = match.group(0) # Remove table-level options: ENGINE=..., AUTO_INCREMENT=N, etc. block = re.sub(r'\)\s*ENGINE.*?;', ');', block, flags=re.IGNORECASE | re.DOTALL) # Remove inline CHARACTER SET / COLLATE (column-level) block = re.sub(r'\s*CHARACTER\s+SET\s+\S+(?:\s+COLLATE\s+\S+)?', '', block, flags=re.IGNORECASE) block = re.sub(r'\s+COLLATE\s+\S+', '', block, flags=re.IGNORECASE) # Remove inline COMMENT '...' block = re.sub(r"\s+COMMENT\s+'[^']*'", '', block, flags=re.IGNORECASE) # Remove USING BTREE block = re.sub(r'\s+USING\s+BTREE', '', block, flags=re.IGNORECASE) # Remove ON UPDATE CURRENT_TIMESTAMP block = re.sub(r'\s+ON\s+UPDATE\s+CURRENT_TIMESTAMP\s*(?:\([^)]*\))?', '', block, flags=re.IGNORECASE) # MySQL types -> SQLite block = re.sub(r'\b(tinyint|smallint|mediumint|bigint|int)\s*\(\s*\d+\s*\)', 'INTEGER', block, flags=re.IGNORECASE) block = re.sub(r'\b(tinyint|smallint|mediumint|bigint|int)\b', 'INTEGER', block, flags=re.IGNORECASE) block = re.sub(r'\bvarchar\s*\(\s*\d+\s*\)', 'TEXT', block, flags=re.IGNORECASE) block = re.sub(r'\bdatetime\s*(?:\([^)]*\))?', 'TEXT', block, flags=re.IGNORECASE) block = re.sub(r'\bdate\b', 'TEXT', block, flags=re.IGNORECASE) block = re.sub(r'\b(tinytext|mediumtext|longtext)\b', 'TEXT', block, flags=re.IGNORECASE) # AUTO_INCREMENT -> AUTOINCREMENT as INTEGER PRIMARY KEY block = re.sub( r'(`\w+`)\s+INTEGER\s+NOT\s+NULL\s+AUTO_INCREMENT', r'\1 INTEGER PRIMARY KEY AUTOINCREMENT', block, flags=re.IGNORECASE ) block = re.sub(r'\bAUTO_INCREMENT\b', 'AUTOINCREMENT', block, flags=re.IGNORECASE) if 'AUTOINCREMENT' in block: block = re.sub(r'\s*,\s*PRIMARY\s+KEY\s*\(`\w+`\)', '', block, flags=re.IGNORECASE) # Remove standalone INDEX/KEY lines lines2 = block.splitlines() result2 = [] for ln in lines2: s = ln.strip() if re.match(r'^\s*(INDEX|KEY)\s+', s, re.IGNORECASE) and \ not re.match(r'^\s*(PRIMARY|UNIQUE)\s', s, re.IGNORECASE): # Remove trailing comma from previous line if result2 and result2[-1].strip().endswith(','): result2[-1] = result2[-1].rstrip(',') continue result2.append(ln) block = '\n'.join(result2) # Remove trailing comma before ) block = re.sub(r',\s*\)', '\n)', block) return block # Apply clean_create to all CREATE TABLE blocks sql = re.sub(r'CREATE\s+TABLE.*?;', clean_create, sql, flags=re.IGNORECASE | re.DOTALL) return sql def main(): with open(MYSQL_DUMP, 'r', encoding='utf-8') as f: raw = f.read() sqlite_sql = preprocess(raw) if os.path.exists(DB_PATH): os.remove(DB_PATH) conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA foreign_keys=OFF;") # Use executescript which handles statement splitting properly try: conn.executescript(sqlite_sql) except sqlite3.Error as e: print(f"Error during executescript: {e}") conn.commit() # Verify cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") tables = [row[0] for row in cursor.fetchall()] print(f"Database: {DB_PATH}") print(f"Tables created: {len(tables)}") for t in tables: if t == 'sqlite_sequence': continue count = conn.execute(f'SELECT COUNT(*) FROM "{t}"').fetchone()[0] print(f" {t}: {count} rows") conn.close() if __name__ == '__main__': main()