mathy/mathstream/utils.py
2025-11-05 16:35:15 +01:00

259 lines
7.7 KiB
Python

import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, List, Dict, Union
LOG_DB_PATH = Path("./instance/mathstream_logs.sqlite")
def _is_relative_to(path: Path, other: Path) -> bool:
try:
path.resolve().relative_to(other.resolve())
return True
except ValueError:
return False
def _normalize_paths(paths: Iterable[Path]) -> List[str]:
return [str(Path(p).resolve()) for p in paths]
def _ensure_db(reset: bool = False) -> None:
LOG_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(LOG_DB_PATH) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS logs (
path TEXT PRIMARY KEY,
created_at REAL,
last_access REAL,
access_count INTEGER DEFAULT 0
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS refs (
path TEXT PRIMARY KEY,
ref_count INTEGER DEFAULT 0
)
"""
)
if reset:
conn.execute("DELETE FROM logs")
conn.execute("DELETE FROM refs")
conn.commit()
_ensure_db(reset=True)
def register_log_file(path: Path) -> None:
"""Ensure the log database is aware of a file's existence."""
normalized = _normalize_paths([path])[0]
_ensure_db()
timestamp = datetime.now(timezone.utc).timestamp()
with sqlite3.connect(LOG_DB_PATH) as conn:
conn.execute(
"""
INSERT INTO logs (path, created_at, last_access, access_count)
VALUES (?, ?, ?, 0)
ON CONFLICT(path)
DO NOTHING
""",
(normalized, timestamp, timestamp),
)
conn.execute(
"""
INSERT INTO refs (path, ref_count)
VALUES (?, 0)
ON CONFLICT(path)
DO NOTHING
""",
(normalized,),
)
conn.commit()
def register_reference(path: Path) -> None:
"""Increment reference count similarly to Python's ref counter."""
normalized = _normalize_paths([path])[0]
_ensure_db()
timestamp = datetime.now(timezone.utc).timestamp()
with sqlite3.connect(LOG_DB_PATH) as conn:
conn.execute(
"""
INSERT INTO logs (path, created_at, last_access, access_count)
VALUES (?, ?, ?, 1)
ON CONFLICT(path)
DO NOTHING
""",
(normalized, timestamp, timestamp),
)
conn.execute(
"""
INSERT INTO refs (path, ref_count)
VALUES (?, 1)
ON CONFLICT(path)
DO UPDATE SET ref_count = ref_count + 1
""",
(normalized,),
)
conn.execute(
"""
UPDATE logs
SET last_access = ?, access_count = access_count + 1
WHERE path = ?
""",
(timestamp, normalized),
)
conn.commit()
def touch_log_file(path: Path) -> None:
"""Refresh access metadata when a file is streamed."""
normalized = _normalize_paths([path])[0]
_ensure_db()
timestamp = datetime.now(timezone.utc).timestamp()
with sqlite3.connect(LOG_DB_PATH) as conn:
conn.execute(
"""
INSERT INTO logs (path, created_at, last_access, access_count)
VALUES (?, ?, ?, 1)
ON CONFLICT(path)
DO UPDATE SET
last_access = excluded.last_access,
access_count = logs.access_count + 1
""",
(normalized, timestamp, timestamp),
)
conn.commit()
def wipe_log_records() -> None:
"""Drop all bookkeeping (used after manual log purges)."""
_ensure_db()
with sqlite3.connect(LOG_DB_PATH) as conn:
conn.execute("DELETE FROM logs")
conn.execute("DELETE FROM refs")
conn.commit()
def _delete_records(paths: List[Path]) -> None:
if not paths:
return
normalized = [(str(p.resolve()),) for p in paths]
with sqlite3.connect(LOG_DB_PATH) as conn:
conn.executemany("DELETE FROM logs WHERE path = ?", normalized)
conn.executemany("DELETE FROM refs WHERE path = ?", normalized)
conn.commit()
def collect_garbage(score_threshold: float) -> list[Path]:
"""Remove seldom-used staged files based on an age/refcount score."""
if score_threshold < 0:
raise ValueError("score_threshold must be non-negative")
_ensure_db()
now = datetime.now(timezone.utc).timestamp()
with sqlite3.connect(LOG_DB_PATH) as conn:
rows = conn.execute(
"""
SELECT
l.path,
COALESCE(l.created_at, ?),
COALESCE(l.last_access, l.created_at, ?),
COALESCE(l.access_count, 0),
COALESCE(r.ref_count, 0)
FROM logs l
LEFT JOIN refs r ON l.path = r.path
""",
(now, now),
).fetchall()
removed: list[Path] = []
for path_str, created_at, last_access, access_count, ref_count in rows:
path = Path(path_str)
age = now - (last_access or created_at or now)
score = age / ((ref_count + 1) * (access_count + 1))
if score < score_threshold:
continue
if path.exists():
try:
path.unlink()
except OSError:
continue
removed.append(path)
_delete_records(removed)
return removed
def release_reference(path: Path, delete_file: bool = True) -> bool:
"""Decrease the reference count and optionally delete the file when it hits zero."""
normalized = _normalize_paths([path])[0]
_ensure_db()
with sqlite3.connect(LOG_DB_PATH) as conn:
row = conn.execute(
"SELECT ref_count FROM refs WHERE path = ?", (normalized,)
).fetchone()
if row is None:
return False
current = row[0] or 0
new_count = max(current - 1, 0)
if new_count > 0:
conn.execute(
"UPDATE refs SET ref_count = ? WHERE path = ?", (new_count, normalized)
)
conn.commit()
return False
conn.execute("DELETE FROM refs WHERE path = ?", (normalized,))
conn.execute("DELETE FROM logs WHERE path = ?", (normalized,))
conn.commit()
removed = False
if delete_file and path.exists():
try:
path.unlink()
removed = True
except OSError:
removed = False
return removed
def tracked_files() -> Dict[str, int]:
"""Return a mapping of tracked file paths to their reference counts."""
_ensure_db()
with sqlite3.connect(LOG_DB_PATH) as conn:
rows = conn.execute("SELECT path, ref_count FROM refs").fetchall()
return {path: ref_count for path, ref_count in rows}
def instance_stats(root: Union[str, Path] = Path("instance")) -> Dict[str, Union[str, int, bool]]:
"""Return simple usage statistics for the staging directory."""
base = Path(root)
resolved = base.resolve() if base.exists() else base
stats: Dict[str, Union[str, int, bool]] = {
"root": str(resolved),
"exists": base.exists(),
"total_files": 0,
"total_bytes": 0,
"log_files": 0,
"log_bytes": 0,
}
if not base.exists():
return stats
log_dir = base / "log"
for candidate in base.rglob("*"):
if not candidate.is_file():
continue
size = candidate.stat().st_size
stats["total_files"] += 1
stats["total_bytes"] += size
if _is_relative_to(candidate, log_dir):
stats["log_files"] += 1
stats["log_bytes"] += size
return stats