mathy/mathstream/number.py
2025-11-05 16:35:15 +01:00

226 lines
6.6 KiB
Python

import hashlib
import weakref
from collections import Counter
from pathlib import Path
from typing import Dict, Optional, Union, Any
from .utils import (
register_log_file,
register_reference,
touch_log_file,
release_reference,
)
LOG_DIR = Path("./instance/log")
_MANUAL_FREE_ONLY = False
def _ensure_log_dir() -> None:
LOG_DIR.mkdir(parents=True, exist_ok=True)
def _canonicalize_literal(value: str) -> str:
raw = value.strip()
if not raw:
raise ValueError("Literal value cannot be empty.")
sign = ""
digits = raw
if raw[0] in "+-":
sign = "-" if raw[0] == "-" else ""
digits = raw[1:]
if not digits or not digits.isdigit():
raise ValueError(f"Literal must be an integer string, got: {value!r}")
digits = digits.lstrip("0") or "0"
if digits == "0":
sign = ""
return f"{sign}{digits}"
def _is_in_log_dir(path: Path) -> bool:
try:
path.resolve().relative_to(LOG_DIR.resolve())
return True
except ValueError:
return False
class StreamNumber:
def __init__(
self,
file_path: Optional[Union[str, Path]] = None,
*,
literal: Optional[str] = None,
):
if (file_path is None) == (literal is None):
raise ValueError("Provide exactly one of file_path or literal.")
if literal is not None:
normalized = _canonicalize_literal(literal)
_ensure_log_dir()
literal_hash = hashlib.sha1(normalized.encode()).hexdigest()[:10]
self.path = LOG_DIR / f"literal_{literal_hash}.txt"
self.path.write_text(normalized, encoding="utf-8")
else:
self.path = Path(file_path)
if not self.path.exists():
raise FileNotFoundError(self.path)
self.hash = hashlib.sha1(str(self.path).encode()).hexdigest()[:10]
self._normalized_path = str(self.path.resolve())
self._released = False
_increment_active(self.path)
if _is_in_log_dir(self.path):
register_log_file(self.path)
register_reference(self.path)
self._finalizer = weakref.finalize(
self, _finalize_instance, self._normalized_path
)
def __repr__(self):
return f"<StreamNumber {self.path.name}>"
def stream(self, chunk_size=4096):
"""Yield chunks of digits as strings."""
if _is_in_log_dir(self.path):
touch_log_file(self.path)
with open(self.path, "r", encoding="utf-8") as f:
while chunk := f.read(chunk_size):
yield chunk.strip().replace(",", ".")
def write_stage(self, stage, data: str):
"""Write intermediate stage result."""
_ensure_log_dir()
stage_file = LOG_DIR / f"{self.hash}_stage_{stage}.bin"
with open(stage_file, "wb") as f:
f.write(data.encode())
register_log_file(stage_file)
return stage_file
def free(self, *, delete_file: bool = True) -> None:
"""Release this stream's reference and optionally delete the staged file."""
if self._released:
return
self._released = True
if self._finalizer.alive:
self._finalizer.detach()
_decrement_active(Path(self._normalized_path), delete_file=delete_file)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.free()
def __add__(self, other):
return _apply_binary_op("add", self, _coerce_operand(other))
def __sub__(self, other):
return _apply_binary_op("sub", self, _coerce_operand(other))
def __mul__(self, other):
return _apply_binary_op("mul", self, _coerce_operand(other))
def __truediv__(self, other):
return _apply_binary_op("div", self, _coerce_operand(other))
def __mod__(self, other):
return _apply_binary_op("mod", self, _coerce_operand(other))
def __pow__(self, other):
return _apply_binary_op("pow", self, _coerce_operand(other))
def __radd__(self, other):
return _apply_binary_op("add", _coerce_operand(other), self)
def __rsub__(self, other):
return _apply_binary_op("sub", _coerce_operand(other), self)
def __rmul__(self, other):
return _apply_binary_op("mul", _coerce_operand(other), self)
def __rtruediv__(self, other):
return _apply_binary_op("div", _coerce_operand(other), self)
def __rmod__(self, other):
return _apply_binary_op("mod", _coerce_operand(other), self)
def __rpow__(self, other):
return _apply_binary_op("pow", _coerce_operand(other), self)
_ACTIVE_COUNTER: Counter[str] = Counter()
def _increment_active(path: Path) -> None:
key = str(path.resolve())
_ACTIVE_COUNTER[key] += 1
def _decrement_active(path: Path, delete_file: bool = True) -> None:
key = str(path.resolve())
current = _ACTIVE_COUNTER.get(key, 0)
if current <= 1:
_ACTIVE_COUNTER.pop(key, None)
else:
_ACTIVE_COUNTER[key] = current - 1
if _is_in_log_dir(path):
release_reference(path, delete_file=delete_file)
def _finalize_instance(path_str: str) -> None:
if _MANUAL_FREE_ONLY:
return
_decrement_active(Path(path_str))
def free_stream(number: StreamNumber, *, delete_file: bool = True) -> None:
"""Convenience helper mirroring manual memory management semantics."""
number.free(delete_file=delete_file)
def active_streams() -> Dict[str, int]:
"""Return the active StreamNumber paths mapped to in-memory reference counts."""
return dict(_ACTIVE_COUNTER)
def set_manual_free_only(enabled: bool) -> None:
"""Toggle whether garbage collection happens only via explicit .free() calls."""
global _MANUAL_FREE_ONLY
_MANUAL_FREE_ONLY = bool(enabled)
def manual_free_only_enabled() -> bool:
"""Return the current manual-free-only toggle."""
return _MANUAL_FREE_ONLY
def _coerce_operand(value: Any) -> StreamNumber:
"""Convert supported operand types into a StreamNumber."""
if isinstance(value, StreamNumber):
return value
if isinstance(value, (int,)):
return StreamNumber(literal=str(value))
if isinstance(value, Path):
return StreamNumber(value)
if isinstance(value, str):
candidate = Path(value)
if candidate.exists():
return StreamNumber(candidate)
return StreamNumber(literal=value)
raise TypeError(f"Unsupported operand type for StreamNumber: {type(value)!r}")
def _apply_binary_op(
op_name: str, left: StreamNumber, right: StreamNumber
) -> StreamNumber:
from . import engine
func = getattr(engine, op_name)
return func(left, right)