lol streamed-math REPL

This commit is contained in:
Dominik Krenn 2025-11-05 16:35:15 +01:00
parent f6eee30f17
commit cbc7bf403e
6 changed files with 496 additions and 15 deletions

View File

@ -58,6 +58,7 @@ Available operations:
- Environment helpers: `clear_logs`, `StreamNumber.write_stage`, `engine.LOG_DIR`
- Python sugar: `StreamNumber` implements `+`, `-`, `*`, `/`, `%`, `**`, and their reflected counterparts. Raw `int`, `str`, or `pathlib.Path` operands are coerced automatically.
- Context manager support: `with StreamNumber(...) as sn:` ensures `.free()` is called at exit.
- Module entry point: `python -m mathstream` launches the interactive streamed-math REPL from `stream_repl.py`.
## How It Works
@ -94,6 +95,7 @@ Available operations:
- `collatz.py` / `collatz_ui/` Curses dashboard that streams Collatz sequences.
- `seed_start.py` Seeds `start.txt` via streamed additions from various sources.
- `find_my.py` + `pi_finder/` Nilakantha-based π explorer that writes results to `found.pi`.
- `stream_repl.py` / `python -m mathstream` Interactive REPL for streamed math (supports `save <var> <path>`, `:show`, `:purge`, `:cleanmode`, `:stats`, and `exit -s` to keep staging files).
- `WORK.md` Deep dive into architecture (Logger DB schema, reference lifetimes, cleanup flow).
- `collatz_ui/views.py` Reference implementation of a threaded worker that coordinates streamed math and curses rendering without blocking.
- `pi_finder/engine.py` Example of building high-precision algorithms (Nilakantha π) purely via streamed primitives, including manual caching of million-digit scale factors.

View File

@ -7,12 +7,13 @@ from .number import (
set_manual_free_only,
manual_free_only_enabled,
)
from .utils import collect_garbage, tracked_files
from .utils import collect_garbage, tracked_files, instance_stats
__all__ = [
"clear_logs",
"collect_garbage",
"tracked_files",
"instance_stats",
"add",
"sub",
"mul",

11
mathstream/__main__.py Normal file
View File

@ -0,0 +1,11 @@
from __future__ import annotations
from . import repl
def main() -> None:
repl.run()
if __name__ == "__main__":
main()

View File

@ -3,7 +3,6 @@ import weakref
from collections import Counter
from pathlib import Path
from typing import Dict, Optional, Union, Any
from .engine import add, sub, mul, div, mod, pow
from .utils import (
register_log_file,
@ -119,40 +118,40 @@ class StreamNumber:
self.free()
def __add__(self, other):
return add(self, _coerce_operand(other))
return _apply_binary_op("add", self, _coerce_operand(other))
def __sub__(self, other):
return sub(self, _coerce_operand(other))
return _apply_binary_op("sub", self, _coerce_operand(other))
def __mul__(self, other):
return mul(self, _coerce_operand(other))
return _apply_binary_op("mul", self, _coerce_operand(other))
def __truediv__(self, other):
return div(self, _coerce_operand(other))
return _apply_binary_op("div", self, _coerce_operand(other))
def __mod__(self, other):
return mod(self, _coerce_operand(other))
return _apply_binary_op("mod", self, _coerce_operand(other))
def __pow__(self, other):
return pow(self, _coerce_operand(other))
return _apply_binary_op("pow", self, _coerce_operand(other))
def __radd__(self, other):
return add(_coerce_operand(other), self)
return _apply_binary_op("add", _coerce_operand(other), self)
def __rsub__(self, other):
return sub(_coerce_operand(other), self)
return _apply_binary_op("sub", _coerce_operand(other), self)
def __rmul__(self, other):
return mul(_coerce_operand(other), self)
return _apply_binary_op("mul", _coerce_operand(other), self)
def __rtruediv__(self, other):
return div(_coerce_operand(other), self)
return _apply_binary_op("div", _coerce_operand(other), self)
def __rmod__(self, other):
return mod(_coerce_operand(other), self)
return _apply_binary_op("mod", _coerce_operand(other), self)
def __rpow__(self, other):
return pow(_coerce_operand(other), self)
return _apply_binary_op("pow", _coerce_operand(other), self)
_ACTIVE_COUNTER: Counter[str] = Counter()
@ -215,3 +214,12 @@ def _coerce_operand(value: Any) -> StreamNumber:
return StreamNumber(candidate)
return StreamNumber(literal=value)
raise TypeError(f"Unsupported operand type for StreamNumber: {type(value)!r}")
def _apply_binary_op(
op_name: str, left: StreamNumber, right: StreamNumber
) -> StreamNumber:
from . import engine
func = getattr(engine, op_name)
return func(left, right)

421
mathstream/repl.py Normal file
View File

@ -0,0 +1,421 @@
from __future__ import annotations
import argparse
import ast
from pathlib import Path
from typing import Dict, Optional
class SkipCleanupExit(Exception):
"""Signal that the REPL should exit without wiping the staging directory."""
from . import (
StreamNumber,
add,
sub,
mul,
div,
mod,
pow,
free_stream,
clear_logs,
tracked_files,
active_streams,
set_manual_free_only,
manual_free_only_enabled,
instance_stats,
)
Prompt = "mathstream> "
def _cleanup_mode_label(manual_only: bool) -> str:
return "manual (auto cleanup off)" if manual_only else "auto (auto cleanup on)"
def run(argv: Optional[list[str]] = None) -> None:
parser = argparse.ArgumentParser(
description="Interactive shell for mathstream streamed arithmetic."
)
parser.add_argument(
"--cleanmode",
choices=("auto", "manual"),
default="manual",
help=(
"Choose how staged files are cleaned up: 'auto' deletes them when streams "
"fall out of scope; 'manual' keeps them until freed or purged."
),
)
parser.add_argument(
"--manual-free",
dest="deprecated_manual_free",
action="store_true",
help=argparse.SUPPRESS,
)
parser.add_argument(
"--preview",
type=int,
default=80,
help="Digits to preview when printing results (default: 80).",
)
args = parser.parse_args(argv)
manual_mode = args.cleanmode == "manual"
if getattr(args, "deprecated_manual_free", False):
manual_mode = True
print("[warn] --manual-free is deprecated; use '--cleanmode manual' instead.")
set_manual_free_only(manual_mode)
env: Dict[str, StreamNumber] = {}
_greetings(manual_mode)
exit_code = 69
skip_cleanup = False
while True:
try:
line = input(Prompt)
except EOFError:
print()
exit_code = 42
break
except KeyboardInterrupt:
print()
continue
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("save "):
parts = stripped.split(maxsplit=2)
if len(parts) != 3:
print("usage: save name filename")
else:
_, name, filename = parts
_save_variable(env, name, filename)
continue
try:
if _handle_command(stripped, env):
continue
except SkipCleanupExit:
skip_cleanup = True
exit_code = 0
break
except EOFError:
exit_code = 621
break
try:
result = _evaluate_line(line, env)
except Exception as exc:
print(f"[error] {exc}")
continue
if result is None:
continue
env["_"] = result
preview = _preview_digits(result, args.preview)
digits_count = sum(1 for ch in preview if ch.isdigit())
print(f"[result] {result.path} ({digits_count} digits previewed)")
print(
f" {preview}"
f"{'' if _has_more_digits(result, args.preview) else ''}"
)
_shutdown(env, exit_code, skip_cleanup)
def _greetings(manual_only: bool) -> None:
print("mathstream REPL ready. type ':help' for commands.")
print(f"cleanup mode: {_cleanup_mode_label(manual_only)}")
def _handle_command(line: str, env: Dict[str, StreamNumber]) -> bool:
if line == "exit -s":
raise SkipCleanupExit
if line in {"quit", "exit"}:
raise EOFError
if not line.startswith(":"):
return False
parts = line[1:].split()
if not parts:
return True
cmd, *rest = parts
if cmd in {"h", "help"}:
print(
"commands:\n"
" :vars list stored variables\n"
" :show name [-f] preview digits for a variable; use -f to print full value\n"
" :save name path write the digits for a variable to a file (relative to cwd)\n"
" :free name free a variable and delete its staged file\n"
" :purge free all variables and clear logs directory\n"
" :cleanmode [mode] set cleanup mode to 'auto' or 'manual' (default manual)\n"
" :stats [path] summarize file counts and bytes (default: ./instance)\n"
" :tracked show tracked file counts from sqlite\n"
" :active show in-memory StreamNumber handles\n"
" :reload reload the sqlite tracker without clearing files\n"
" :help this message\n"
" :clear / :cls clear the terminal display\n"
" exit/quit leave the REPL"
)
elif cmd == "vars":
if not env:
print("(no variables)")
else:
for name, number in env.items():
print(f"{name:>8} -> {number.path}")
elif cmd == "show":
if not rest:
print("usage: :show name [-f]")
else:
name = rest[0]
force = "-f" in rest[1:]
number = env.get(name)
if number is None:
print(f"[warn] no variable named {name!r}")
else:
if force:
digits, _ = _collect_digits(number, limit=None)
print(f"{name} = {digits}")
else:
preview, truncated = _collect_digits(number, limit=100)
if truncated:
print(
f"[warn] {name} exceeds 100 digits. "
"Use ':show name -f' to print the full value."
)
else:
print(f"{name} = {preview}")
elif cmd == "save":
if len(rest) != 2:
print("usage: :save name path")
else:
name, filename = rest
_save_variable(env, name, filename)
elif cmd == "free":
if not rest:
print("usage: :free name")
else:
name = rest[0]
number = env.pop(name, None)
if number is None:
print(f"[warn] no variable named {name!r}")
else:
free_stream(number)
print(f"[ok] freed {name}")
elif cmd == "purge":
for number in env.values():
free_stream(number)
env.clear()
clear_logs()
print("[ok] cleared environment and staging directory")
elif cmd == "cleanmode":
if not rest:
mode = _cleanup_mode_label(manual_free_only_enabled())
print(f"[info] cleanup mode is {mode}")
elif len(rest) != 1 or rest[0].lower() not in {"auto", "manual"}:
print("usage: :cleanmode [auto|manual]")
else:
target_mode = rest[0].lower()
manual = target_mode == "manual"
set_manual_free_only(manual)
print(f"[ok] cleanup mode set to {_cleanup_mode_label(manual)}")
elif cmd == "stats":
target = Path(rest[0]).expanduser() if rest else Path("instance")
info = instance_stats(target)
print(f"[stats] root={info['root']}")
if not info["exists"]:
print(" (directory missing)")
else:
total_files = info["total_files"]
total_bytes = info["total_bytes"]
log_files = info["log_files"]
log_bytes = info["log_bytes"]
print(f" files: {total_files} total (log/: {log_files})")
print(f" bytes: {total_bytes} total (log/: {log_bytes})")
elif cmd in {"clear", "cls"}:
_clear_screen()
elif cmd == "tracked":
tracked = tracked_files()
if not tracked:
print("(no tracked files)")
else:
for path, count in tracked.items():
print(f"{path} (ref_count={count})")
elif cmd == "active":
active = active_streams()
if not active:
print("(no active streams)")
else:
for path, count in active.items():
print(f"{path} (python_refs={count})")
elif cmd == "reload":
print("tracker reload not implemented yet.")
else:
print(f"[warn] unknown command :{cmd}")
return True
def _evaluate_line(line: str, env: Dict[str, StreamNumber]) -> Optional[StreamNumber]:
try:
node = ast.parse(line, mode="exec")
except SyntaxError:
expr_node = ast.parse(line, mode="eval")
result = _eval_expr(expr_node.body, env)
return result
if len(node.body) != 1 or not isinstance(node.body[0], ast.Assign):
raise SyntaxError("only single assignment statements are allowed (e.g. x = 2 + 2)")
assign = node.body[0]
if len(assign.targets) != 1 or not isinstance(assign.targets[0], ast.Name):
raise SyntaxError("left-hand side must be a simple variable name")
target = assign.targets[0].id
result = _eval_expr(assign.value, env)
previous = env.pop(target, None)
if previous is not None:
free_stream(previous)
env[target] = result
print(f"[set] {target} = {result.path}")
return None
def _eval_expr(node: ast.AST, env: Dict[str, StreamNumber]) -> StreamNumber:
if isinstance(node, ast.BinOp):
left = _eval_expr(node.left, env)
right = _eval_expr(node.right, env)
if isinstance(node.op, ast.Add):
return add(left, right)
if isinstance(node.op, ast.Sub):
return sub(left, right)
if isinstance(node.op, ast.Mult):
return mul(left, right)
if isinstance(node.op, ast.Div):
return div(left, right)
if isinstance(node.op, ast.Mod):
return mod(left, right)
if isinstance(node.op, ast.Pow):
return pow(left, right)
raise TypeError(f"unsupported operator {ast.dump(node.op)}")
if isinstance(node, ast.UnaryOp):
operand = _eval_expr(node.operand, env)
if isinstance(node.op, ast.UAdd):
return operand
if isinstance(node.op, ast.USub):
zero = StreamNumber(literal="0")
result = sub(zero, operand)
zero.free()
return result
raise TypeError("only +x and -x unary operators are supported")
if isinstance(node, ast.Name):
if node.id not in env:
raise NameError(f"{node.id!r} is not defined")
return env[node.id]
if isinstance(node, ast.Constant):
value = node.value
if isinstance(value, bool):
raise TypeError("boolean literals are not supported")
if isinstance(value, int):
return StreamNumber(literal=str(value))
if isinstance(value, str):
return _coerce_literal_or_path(value)
raise TypeError(f"unsupported literal type {type(value)!r}")
if isinstance(node, ast.Call):
raise TypeError("function calls are not allowed in expressions")
raise TypeError(f"unsupported syntax: {ast.dump(node)}")
def _coerce_literal_or_path(value: str) -> StreamNumber:
candidate = Path(value)
if candidate.exists():
return StreamNumber(candidate)
return StreamNumber(literal=value)
def _preview_digits(number: StreamNumber, limit: int) -> str:
remaining = max(0, limit)
pieces: list[str] = []
for chunk in number.stream():
if remaining <= 0:
break
slice_chunk = chunk[:remaining]
pieces.append(slice_chunk)
remaining -= len(slice_chunk)
return "".join(pieces) or "0"
def _has_more_digits(number: StreamNumber, limit: int) -> bool:
total = 0
for chunk in number.stream():
total += len(chunk)
if total > limit:
return True
return False
def _save_variable(env: Dict[str, StreamNumber], name: str, filename: str) -> None:
number = env.get(name)
if number is None:
print(f"[warn] no variable named {name!r}")
return
digits, _ = _collect_digits(number, limit=None)
target = Path(filename)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(digits + "\n", encoding="utf-8")
print(f"[ok] saved {name} to {target.resolve()}")
def _collect_digits(number: StreamNumber, limit: Optional[int]) -> tuple[str, bool]:
remaining = limit
pieces: list[str] = []
truncated = False
for chunk in number.stream():
if remaining is not None:
if remaining <= 0:
truncated = True
break
if len(chunk) > remaining:
pieces.append(chunk[:remaining])
truncated = True
remaining = 0
break
remaining -= len(chunk)
pieces.append(chunk)
return ("".join(pieces) or "0", truncated)
def _clear_screen() -> None:
import os
os.system("cls" if os.name == "nt" else "clear")
def _wipe_instance() -> None:
from shutil import rmtree
root = Path("instance")
if root.exists():
rmtree(root, ignore_errors=True)
def _shutdown(env: Dict[str, StreamNumber], exit_code: int, skip_cleanup: bool) -> None:
for number in env.values():
free_stream(number)
env.clear()
set_manual_free_only(False)
if not skip_cleanup:
_wipe_instance()
print("bye have a great time UwU")
raise SystemExit(exit_code or 621)

View File

@ -1,11 +1,19 @@
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, List, Dict
from typing import Iterable, List, Dict, Union
LOG_DB_PATH = Path("./instance/mathstream_logs.sqlite")
def _is_relative_to(path: Path, other: Path) -> bool:
try:
path.resolve().relative_to(other.resolve())
return True
except ValueError:
return False
def _normalize_paths(paths: Iterable[Path]) -> List[str]:
return [str(Path(p).resolve()) for p in paths]
@ -218,3 +226,33 @@ def tracked_files() -> Dict[str, int]:
with sqlite3.connect(LOG_DB_PATH) as conn:
rows = conn.execute("SELECT path, ref_count FROM refs").fetchall()
return {path: ref_count for path, ref_count in rows}
def instance_stats(root: Union[str, Path] = Path("instance")) -> Dict[str, Union[str, int, bool]]:
"""Return simple usage statistics for the staging directory."""
base = Path(root)
resolved = base.resolve() if base.exists() else base
stats: Dict[str, Union[str, int, bool]] = {
"root": str(resolved),
"exists": base.exists(),
"total_files": 0,
"total_bytes": 0,
"log_files": 0,
"log_bytes": 0,
}
if not base.exists():
return stats
log_dir = base / "log"
for candidate in base.rglob("*"):
if not candidate.is_file():
continue
size = candidate.stat().st_size
stats["total_files"] += 1
stats["total_bytes"] += size
if _is_relative_to(candidate, log_dir):
stats["log_files"] += 1
stats["log_bytes"] += size
return stats