226 lines
6.6 KiB
Python
226 lines
6.6 KiB
Python
import hashlib
|
|
import weakref
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Union, Any
|
|
|
|
from .utils import (
|
|
register_log_file,
|
|
register_reference,
|
|
touch_log_file,
|
|
release_reference,
|
|
)
|
|
|
|
LOG_DIR = Path("./instance/log")
|
|
_MANUAL_FREE_ONLY = False
|
|
|
|
|
|
def _ensure_log_dir() -> None:
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _canonicalize_literal(value: str) -> str:
|
|
raw = value.strip()
|
|
if not raw:
|
|
raise ValueError("Literal value cannot be empty.")
|
|
|
|
sign = ""
|
|
digits = raw
|
|
if raw[0] in "+-":
|
|
sign = "-" if raw[0] == "-" else ""
|
|
digits = raw[1:]
|
|
|
|
if not digits or not digits.isdigit():
|
|
raise ValueError(f"Literal must be an integer string, got: {value!r}")
|
|
|
|
digits = digits.lstrip("0") or "0"
|
|
if digits == "0":
|
|
sign = ""
|
|
return f"{sign}{digits}"
|
|
|
|
|
|
def _is_in_log_dir(path: Path) -> bool:
|
|
try:
|
|
path.resolve().relative_to(LOG_DIR.resolve())
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
class StreamNumber:
|
|
def __init__(
|
|
self,
|
|
file_path: Optional[Union[str, Path]] = None,
|
|
*,
|
|
literal: Optional[str] = None,
|
|
):
|
|
if (file_path is None) == (literal is None):
|
|
raise ValueError("Provide exactly one of file_path or literal.")
|
|
|
|
if literal is not None:
|
|
normalized = _canonicalize_literal(literal)
|
|
_ensure_log_dir()
|
|
literal_hash = hashlib.sha1(normalized.encode()).hexdigest()[:10]
|
|
self.path = LOG_DIR / f"literal_{literal_hash}.txt"
|
|
self.path.write_text(normalized, encoding="utf-8")
|
|
else:
|
|
self.path = Path(file_path)
|
|
if not self.path.exists():
|
|
raise FileNotFoundError(self.path)
|
|
|
|
self.hash = hashlib.sha1(str(self.path).encode()).hexdigest()[:10]
|
|
self._normalized_path = str(self.path.resolve())
|
|
self._released = False
|
|
|
|
_increment_active(self.path)
|
|
|
|
if _is_in_log_dir(self.path):
|
|
register_log_file(self.path)
|
|
register_reference(self.path)
|
|
|
|
self._finalizer = weakref.finalize(
|
|
self, _finalize_instance, self._normalized_path
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<StreamNumber {self.path.name}>"
|
|
|
|
def stream(self, chunk_size=4096):
|
|
"""Yield chunks of digits as strings."""
|
|
if _is_in_log_dir(self.path):
|
|
touch_log_file(self.path)
|
|
with open(self.path, "r", encoding="utf-8") as f:
|
|
while chunk := f.read(chunk_size):
|
|
yield chunk.strip().replace(",", ".")
|
|
|
|
def write_stage(self, stage, data: str):
|
|
"""Write intermediate stage result."""
|
|
_ensure_log_dir()
|
|
stage_file = LOG_DIR / f"{self.hash}_stage_{stage}.bin"
|
|
with open(stage_file, "wb") as f:
|
|
f.write(data.encode())
|
|
register_log_file(stage_file)
|
|
return stage_file
|
|
|
|
def free(self, *, delete_file: bool = True) -> None:
|
|
"""Release this stream's reference and optionally delete the staged file."""
|
|
if self._released:
|
|
return
|
|
self._released = True
|
|
if self._finalizer.alive:
|
|
self._finalizer.detach()
|
|
_decrement_active(Path(self._normalized_path), delete_file=delete_file)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
self.free()
|
|
|
|
def __add__(self, other):
|
|
return _apply_binary_op("add", self, _coerce_operand(other))
|
|
|
|
def __sub__(self, other):
|
|
return _apply_binary_op("sub", self, _coerce_operand(other))
|
|
|
|
def __mul__(self, other):
|
|
return _apply_binary_op("mul", self, _coerce_operand(other))
|
|
|
|
def __truediv__(self, other):
|
|
return _apply_binary_op("div", self, _coerce_operand(other))
|
|
|
|
def __mod__(self, other):
|
|
return _apply_binary_op("mod", self, _coerce_operand(other))
|
|
|
|
def __pow__(self, other):
|
|
return _apply_binary_op("pow", self, _coerce_operand(other))
|
|
|
|
def __radd__(self, other):
|
|
return _apply_binary_op("add", _coerce_operand(other), self)
|
|
|
|
def __rsub__(self, other):
|
|
return _apply_binary_op("sub", _coerce_operand(other), self)
|
|
|
|
def __rmul__(self, other):
|
|
return _apply_binary_op("mul", _coerce_operand(other), self)
|
|
|
|
def __rtruediv__(self, other):
|
|
return _apply_binary_op("div", _coerce_operand(other), self)
|
|
|
|
def __rmod__(self, other):
|
|
return _apply_binary_op("mod", _coerce_operand(other), self)
|
|
|
|
def __rpow__(self, other):
|
|
return _apply_binary_op("pow", _coerce_operand(other), self)
|
|
|
|
_ACTIVE_COUNTER: Counter[str] = Counter()
|
|
|
|
|
|
def _increment_active(path: Path) -> None:
|
|
key = str(path.resolve())
|
|
_ACTIVE_COUNTER[key] += 1
|
|
|
|
|
|
def _decrement_active(path: Path, delete_file: bool = True) -> None:
|
|
key = str(path.resolve())
|
|
current = _ACTIVE_COUNTER.get(key, 0)
|
|
if current <= 1:
|
|
_ACTIVE_COUNTER.pop(key, None)
|
|
else:
|
|
_ACTIVE_COUNTER[key] = current - 1
|
|
|
|
if _is_in_log_dir(path):
|
|
release_reference(path, delete_file=delete_file)
|
|
|
|
|
|
def _finalize_instance(path_str: str) -> None:
|
|
if _MANUAL_FREE_ONLY:
|
|
return
|
|
_decrement_active(Path(path_str))
|
|
|
|
|
|
def free_stream(number: StreamNumber, *, delete_file: bool = True) -> None:
|
|
"""Convenience helper mirroring manual memory management semantics."""
|
|
number.free(delete_file=delete_file)
|
|
|
|
|
|
def active_streams() -> Dict[str, int]:
|
|
"""Return the active StreamNumber paths mapped to in-memory reference counts."""
|
|
return dict(_ACTIVE_COUNTER)
|
|
|
|
|
|
def set_manual_free_only(enabled: bool) -> None:
|
|
"""Toggle whether garbage collection happens only via explicit .free() calls."""
|
|
global _MANUAL_FREE_ONLY
|
|
_MANUAL_FREE_ONLY = bool(enabled)
|
|
|
|
|
|
def manual_free_only_enabled() -> bool:
|
|
"""Return the current manual-free-only toggle."""
|
|
return _MANUAL_FREE_ONLY
|
|
|
|
|
|
def _coerce_operand(value: Any) -> StreamNumber:
|
|
"""Convert supported operand types into a StreamNumber."""
|
|
if isinstance(value, StreamNumber):
|
|
return value
|
|
if isinstance(value, (int,)):
|
|
return StreamNumber(literal=str(value))
|
|
if isinstance(value, Path):
|
|
return StreamNumber(value)
|
|
if isinstance(value, str):
|
|
candidate = Path(value)
|
|
if candidate.exists():
|
|
return StreamNumber(candidate)
|
|
return StreamNumber(literal=value)
|
|
raise TypeError(f"Unsupported operand type for StreamNumber: {type(value)!r}")
|
|
|
|
|
|
def _apply_binary_op(
|
|
op_name: str, left: StreamNumber, right: StreamNumber
|
|
) -> StreamNumber:
|
|
from . import engine
|
|
|
|
func = getattr(engine, op_name)
|
|
return func(left, right)
|