mathy/mathstream/number.py
2025-11-05 14:27:58 +01:00

218 lines
6.2 KiB
Python

import hashlib
import weakref
from collections import Counter
from pathlib import Path
from typing import Dict, Optional, Union, Any
from .engine import add, sub, mul, div, mod, pow
from .utils import (
register_log_file,
register_reference,
touch_log_file,
release_reference,
)
LOG_DIR = Path("./instance/log")
_MANUAL_FREE_ONLY = False
def _ensure_log_dir() -> None:
LOG_DIR.mkdir(parents=True, exist_ok=True)
def _canonicalize_literal(value: str) -> str:
raw = value.strip()
if not raw:
raise ValueError("Literal value cannot be empty.")
sign = ""
digits = raw
if raw[0] in "+-":
sign = "-" if raw[0] == "-" else ""
digits = raw[1:]
if not digits or not digits.isdigit():
raise ValueError(f"Literal must be an integer string, got: {value!r}")
digits = digits.lstrip("0") or "0"
if digits == "0":
sign = ""
return f"{sign}{digits}"
def _is_in_log_dir(path: Path) -> bool:
try:
path.resolve().relative_to(LOG_DIR.resolve())
return True
except ValueError:
return False
class StreamNumber:
def __init__(
self,
file_path: Optional[Union[str, Path]] = None,
*,
literal: Optional[str] = None,
):
if (file_path is None) == (literal is None):
raise ValueError("Provide exactly one of file_path or literal.")
if literal is not None:
normalized = _canonicalize_literal(literal)
_ensure_log_dir()
literal_hash = hashlib.sha1(normalized.encode()).hexdigest()[:10]
self.path = LOG_DIR / f"literal_{literal_hash}.txt"
self.path.write_text(normalized, encoding="utf-8")
else:
self.path = Path(file_path)
if not self.path.exists():
raise FileNotFoundError(self.path)
self.hash = hashlib.sha1(str(self.path).encode()).hexdigest()[:10]
self._normalized_path = str(self.path.resolve())
self._released = False
_increment_active(self.path)
if _is_in_log_dir(self.path):
register_log_file(self.path)
register_reference(self.path)
self._finalizer = weakref.finalize(
self, _finalize_instance, self._normalized_path
)
def __repr__(self):
return f"<StreamNumber {self.path.name}>"
def stream(self, chunk_size=4096):
"""Yield chunks of digits as strings."""
if _is_in_log_dir(self.path):
touch_log_file(self.path)
with open(self.path, "r", encoding="utf-8") as f:
while chunk := f.read(chunk_size):
yield chunk.strip().replace(",", ".")
def write_stage(self, stage, data: str):
"""Write intermediate stage result."""
_ensure_log_dir()
stage_file = LOG_DIR / f"{self.hash}_stage_{stage}.bin"
with open(stage_file, "wb") as f:
f.write(data.encode())
register_log_file(stage_file)
return stage_file
def free(self, *, delete_file: bool = True) -> None:
"""Release this stream's reference and optionally delete the staged file."""
if self._released:
return
self._released = True
if self._finalizer.alive:
self._finalizer.detach()
_decrement_active(Path(self._normalized_path), delete_file=delete_file)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.free()
def __add__(self, other):
return add(self, _coerce_operand(other))
def __sub__(self, other):
return sub(self, _coerce_operand(other))
def __mul__(self, other):
return mul(self, _coerce_operand(other))
def __truediv__(self, other):
return div(self, _coerce_operand(other))
def __mod__(self, other):
return mod(self, _coerce_operand(other))
def __pow__(self, other):
return pow(self, _coerce_operand(other))
def __radd__(self, other):
return add(_coerce_operand(other), self)
def __rsub__(self, other):
return sub(_coerce_operand(other), self)
def __rmul__(self, other):
return mul(_coerce_operand(other), self)
def __rtruediv__(self, other):
return div(_coerce_operand(other), self)
def __rmod__(self, other):
return mod(_coerce_operand(other), self)
def __rpow__(self, other):
return pow(_coerce_operand(other), self)
_ACTIVE_COUNTER: Counter[str] = Counter()
def _increment_active(path: Path) -> None:
key = str(path.resolve())
_ACTIVE_COUNTER[key] += 1
def _decrement_active(path: Path, delete_file: bool = True) -> None:
key = str(path.resolve())
current = _ACTIVE_COUNTER.get(key, 0)
if current <= 1:
_ACTIVE_COUNTER.pop(key, None)
else:
_ACTIVE_COUNTER[key] = current - 1
if _is_in_log_dir(path):
release_reference(path, delete_file=delete_file)
def _finalize_instance(path_str: str) -> None:
if _MANUAL_FREE_ONLY:
return
_decrement_active(Path(path_str))
def free_stream(number: StreamNumber, *, delete_file: bool = True) -> None:
"""Convenience helper mirroring manual memory management semantics."""
number.free(delete_file=delete_file)
def active_streams() -> Dict[str, int]:
"""Return the active StreamNumber paths mapped to in-memory reference counts."""
return dict(_ACTIVE_COUNTER)
def set_manual_free_only(enabled: bool) -> None:
"""Toggle whether garbage collection happens only via explicit .free() calls."""
global _MANUAL_FREE_ONLY
_MANUAL_FREE_ONLY = bool(enabled)
def manual_free_only_enabled() -> bool:
"""Return the current manual-free-only toggle."""
return _MANUAL_FREE_ONLY
def _coerce_operand(value: Any) -> StreamNumber:
"""Convert supported operand types into a StreamNumber."""
if isinstance(value, StreamNumber):
return value
if isinstance(value, (int,)):
return StreamNumber(literal=str(value))
if isinstance(value, Path):
return StreamNumber(value)
if isinstance(value, str):
candidate = Path(value)
if candidate.exists():
return StreamNumber(candidate)
return StreamNumber(literal=value)
raise TypeError(f"Unsupported operand type for StreamNumber: {type(value)!r}")