import hashlib import weakref from collections import Counter from pathlib import Path from typing import Dict, Optional, Union from .utils import ( register_log_file, register_reference, touch_log_file, release_reference, ) LOG_DIR = Path("./instance/log") _MANUAL_FREE_ONLY = False def _ensure_log_dir() -> None: LOG_DIR.mkdir(parents=True, exist_ok=True) def _canonicalize_literal(value: str) -> str: raw = value.strip() if not raw: raise ValueError("Literal value cannot be empty.") sign = "" digits = raw if raw[0] in "+-": sign = "-" if raw[0] == "-" else "" digits = raw[1:] if not digits or not digits.isdigit(): raise ValueError(f"Literal must be an integer string, got: {value!r}") digits = digits.lstrip("0") or "0" if digits == "0": sign = "" return f"{sign}{digits}" def _is_in_log_dir(path: Path) -> bool: try: path.resolve().relative_to(LOG_DIR.resolve()) return True except ValueError: return False class StreamNumber: def __init__( self, file_path: Optional[Union[str, Path]] = None, *, literal: Optional[str] = None, ): if (file_path is None) == (literal is None): raise ValueError("Provide exactly one of file_path or literal.") if literal is not None: normalized = _canonicalize_literal(literal) _ensure_log_dir() literal_hash = hashlib.sha1(normalized.encode()).hexdigest()[:10] self.path = LOG_DIR / f"literal_{literal_hash}.txt" self.path.write_text(normalized, encoding="utf-8") else: self.path = Path(file_path) if not self.path.exists(): raise FileNotFoundError(self.path) self.hash = hashlib.sha1(str(self.path).encode()).hexdigest()[:10] self._normalized_path = str(self.path.resolve()) self._released = False _increment_active(self.path) if _is_in_log_dir(self.path): register_log_file(self.path) register_reference(self.path) self._finalizer = weakref.finalize( self, _finalize_instance, self._normalized_path ) def __repr__(self): return f"" def stream(self, chunk_size=4096): """Yield chunks of digits as strings.""" if _is_in_log_dir(self.path): touch_log_file(self.path) with open(self.path, "r", encoding="utf-8") as f: while chunk := f.read(chunk_size): yield chunk.strip().replace(",", ".") def write_stage(self, stage, data: str): """Write intermediate stage result.""" _ensure_log_dir() stage_file = LOG_DIR / f"{self.hash}_stage_{stage}.bin" with open(stage_file, "wb") as f: f.write(data.encode()) register_log_file(stage_file) return stage_file def free(self, *, delete_file: bool = True) -> None: """Release this stream's reference and optionally delete the staged file.""" if self._released: return self._released = True if self._finalizer.alive: self._finalizer.detach() _decrement_active(Path(self._normalized_path), delete_file=delete_file) def __enter__(self): return self def __exit__(self, exc_type, exc, tb): self.free() _ACTIVE_COUNTER: Counter[str] = Counter() def _increment_active(path: Path) -> None: key = str(path.resolve()) _ACTIVE_COUNTER[key] += 1 def _decrement_active(path: Path, delete_file: bool = True) -> None: key = str(path.resolve()) current = _ACTIVE_COUNTER.get(key, 0) if current <= 1: _ACTIVE_COUNTER.pop(key, None) else: _ACTIVE_COUNTER[key] = current - 1 if _is_in_log_dir(path): release_reference(path, delete_file=delete_file) def _finalize_instance(path_str: str) -> None: if _MANUAL_FREE_ONLY: return _decrement_active(Path(path_str)) def free_stream(number: StreamNumber, *, delete_file: bool = True) -> None: """Convenience helper mirroring manual memory management semantics.""" number.free(delete_file=delete_file) def active_streams() -> Dict[str, int]: """Return the active StreamNumber paths mapped to in-memory reference counts.""" return dict(_ACTIVE_COUNTER) def set_manual_free_only(enabled: bool) -> None: """Toggle whether garbage collection happens only via explicit .free() calls.""" global _MANUAL_FREE_ONLY _MANUAL_FREE_ONLY = bool(enabled) def manual_free_only_enabled() -> bool: """Return the current manual-free-only toggle.""" return _MANUAL_FREE_ONLY