import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Iterable, List, Dict, Union LOG_DB_PATH = Path("./instance/mathstream_logs.sqlite") def _is_relative_to(path: Path, other: Path) -> bool: try: path.resolve().relative_to(other.resolve()) return True except ValueError: return False def _normalize_paths(paths: Iterable[Path]) -> List[str]: return [str(Path(p).resolve()) for p in paths] def _ensure_db(reset: bool = False) -> None: LOG_DB_PATH.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(LOG_DB_PATH) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS logs ( path TEXT PRIMARY KEY, created_at REAL, last_access REAL, access_count INTEGER DEFAULT 0 ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS refs ( path TEXT PRIMARY KEY, ref_count INTEGER DEFAULT 0 ) """ ) if reset: conn.execute("DELETE FROM logs") conn.execute("DELETE FROM refs") conn.commit() _ensure_db(reset=True) def register_log_file(path: Path) -> None: """Ensure the log database is aware of a file's existence.""" normalized = _normalize_paths([path])[0] _ensure_db() timestamp = datetime.now(timezone.utc).timestamp() with sqlite3.connect(LOG_DB_PATH) as conn: conn.execute( """ INSERT INTO logs (path, created_at, last_access, access_count) VALUES (?, ?, ?, 0) ON CONFLICT(path) DO NOTHING """, (normalized, timestamp, timestamp), ) conn.execute( """ INSERT INTO refs (path, ref_count) VALUES (?, 0) ON CONFLICT(path) DO NOTHING """, (normalized,), ) conn.commit() def register_reference(path: Path) -> None: """Increment reference count similarly to Python's ref counter.""" normalized = _normalize_paths([path])[0] _ensure_db() timestamp = datetime.now(timezone.utc).timestamp() with sqlite3.connect(LOG_DB_PATH) as conn: conn.execute( """ INSERT INTO logs (path, created_at, last_access, access_count) VALUES (?, ?, ?, 1) ON CONFLICT(path) DO NOTHING """, (normalized, timestamp, timestamp), ) conn.execute( """ INSERT INTO refs (path, ref_count) VALUES (?, 1) ON CONFLICT(path) DO UPDATE SET ref_count = ref_count + 1 """, (normalized,), ) conn.execute( """ UPDATE logs SET last_access = ?, access_count = access_count + 1 WHERE path = ? """, (timestamp, normalized), ) conn.commit() def touch_log_file(path: Path) -> None: """Refresh access metadata when a file is streamed.""" normalized = _normalize_paths([path])[0] _ensure_db() timestamp = datetime.now(timezone.utc).timestamp() with sqlite3.connect(LOG_DB_PATH) as conn: conn.execute( """ INSERT INTO logs (path, created_at, last_access, access_count) VALUES (?, ?, ?, 1) ON CONFLICT(path) DO UPDATE SET last_access = excluded.last_access, access_count = logs.access_count + 1 """, (normalized, timestamp, timestamp), ) conn.commit() def wipe_log_records() -> None: """Drop all bookkeeping (used after manual log purges).""" _ensure_db() with sqlite3.connect(LOG_DB_PATH) as conn: conn.execute("DELETE FROM logs") conn.execute("DELETE FROM refs") conn.commit() def _delete_records(paths: List[Path]) -> None: if not paths: return normalized = [(str(p.resolve()),) for p in paths] with sqlite3.connect(LOG_DB_PATH) as conn: conn.executemany("DELETE FROM logs WHERE path = ?", normalized) conn.executemany("DELETE FROM refs WHERE path = ?", normalized) conn.commit() def collect_garbage(score_threshold: float) -> list[Path]: """Remove seldom-used staged files based on an age/refcount score.""" if score_threshold < 0: raise ValueError("score_threshold must be non-negative") _ensure_db() now = datetime.now(timezone.utc).timestamp() with sqlite3.connect(LOG_DB_PATH) as conn: rows = conn.execute( """ SELECT l.path, COALESCE(l.created_at, ?), COALESCE(l.last_access, l.created_at, ?), COALESCE(l.access_count, 0), COALESCE(r.ref_count, 0) FROM logs l LEFT JOIN refs r ON l.path = r.path """, (now, now), ).fetchall() removed: list[Path] = [] for path_str, created_at, last_access, access_count, ref_count in rows: path = Path(path_str) age = now - (last_access or created_at or now) score = age / ((ref_count + 1) * (access_count + 1)) if score < score_threshold: continue if path.exists(): try: path.unlink() except OSError: continue removed.append(path) _delete_records(removed) return removed def release_reference(path: Path, delete_file: bool = True) -> bool: """Decrease the reference count and optionally delete the file when it hits zero.""" normalized = _normalize_paths([path])[0] _ensure_db() with sqlite3.connect(LOG_DB_PATH) as conn: row = conn.execute( "SELECT ref_count FROM refs WHERE path = ?", (normalized,) ).fetchone() if row is None: return False current = row[0] or 0 new_count = max(current - 1, 0) if new_count > 0: conn.execute( "UPDATE refs SET ref_count = ? WHERE path = ?", (new_count, normalized) ) conn.commit() return False conn.execute("DELETE FROM refs WHERE path = ?", (normalized,)) conn.execute("DELETE FROM logs WHERE path = ?", (normalized,)) conn.commit() removed = False if delete_file and path.exists(): try: path.unlink() removed = True except OSError: removed = False return removed def tracked_files() -> Dict[str, int]: """Return a mapping of tracked file paths to their reference counts.""" _ensure_db() with sqlite3.connect(LOG_DB_PATH) as conn: rows = conn.execute("SELECT path, ref_count FROM refs").fetchall() return {path: ref_count for path, ref_count in rows} def instance_stats(root: Union[str, Path] = Path("instance")) -> Dict[str, Union[str, int, bool]]: """Return simple usage statistics for the staging directory.""" base = Path(root) resolved = base.resolve() if base.exists() else base stats: Dict[str, Union[str, int, bool]] = { "root": str(resolved), "exists": base.exists(), "total_files": 0, "total_bytes": 0, "log_files": 0, "log_bytes": 0, } if not base.exists(): return stats log_dir = base / "log" for candidate in base.rglob("*"): if not candidate.is_file(): continue size = candidate.stat().st_size stats["total_files"] += 1 stats["total_bytes"] += size if _is_relative_to(candidate, log_dir): stats["log_files"] += 1 stats["log_bytes"] += size return stats