2025-11-05 16:35:15 +01:00

422 lines
13 KiB
Python

from __future__ import annotations
import argparse
import ast
from pathlib import Path
from typing import Dict, Optional
class SkipCleanupExit(Exception):
"""Signal that the REPL should exit without wiping the staging directory."""
from . import (
StreamNumber,
add,
sub,
mul,
div,
mod,
pow,
free_stream,
clear_logs,
tracked_files,
active_streams,
set_manual_free_only,
manual_free_only_enabled,
instance_stats,
)
Prompt = "mathstream> "
def _cleanup_mode_label(manual_only: bool) -> str:
return "manual (auto cleanup off)" if manual_only else "auto (auto cleanup on)"
def run(argv: Optional[list[str]] = None) -> None:
parser = argparse.ArgumentParser(
description="Interactive shell for mathstream streamed arithmetic."
)
parser.add_argument(
"--cleanmode",
choices=("auto", "manual"),
default="manual",
help=(
"Choose how staged files are cleaned up: 'auto' deletes them when streams "
"fall out of scope; 'manual' keeps them until freed or purged."
),
)
parser.add_argument(
"--manual-free",
dest="deprecated_manual_free",
action="store_true",
help=argparse.SUPPRESS,
)
parser.add_argument(
"--preview",
type=int,
default=80,
help="Digits to preview when printing results (default: 80).",
)
args = parser.parse_args(argv)
manual_mode = args.cleanmode == "manual"
if getattr(args, "deprecated_manual_free", False):
manual_mode = True
print("[warn] --manual-free is deprecated; use '--cleanmode manual' instead.")
set_manual_free_only(manual_mode)
env: Dict[str, StreamNumber] = {}
_greetings(manual_mode)
exit_code = 69
skip_cleanup = False
while True:
try:
line = input(Prompt)
except EOFError:
print()
exit_code = 42
break
except KeyboardInterrupt:
print()
continue
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("save "):
parts = stripped.split(maxsplit=2)
if len(parts) != 3:
print("usage: save name filename")
else:
_, name, filename = parts
_save_variable(env, name, filename)
continue
try:
if _handle_command(stripped, env):
continue
except SkipCleanupExit:
skip_cleanup = True
exit_code = 0
break
except EOFError:
exit_code = 621
break
try:
result = _evaluate_line(line, env)
except Exception as exc:
print(f"[error] {exc}")
continue
if result is None:
continue
env["_"] = result
preview = _preview_digits(result, args.preview)
digits_count = sum(1 for ch in preview if ch.isdigit())
print(f"[result] {result.path} ({digits_count} digits previewed)")
print(
f" {preview}"
f"{'' if _has_more_digits(result, args.preview) else ''}"
)
_shutdown(env, exit_code, skip_cleanup)
def _greetings(manual_only: bool) -> None:
print("mathstream REPL ready. type ':help' for commands.")
print(f"cleanup mode: {_cleanup_mode_label(manual_only)}")
def _handle_command(line: str, env: Dict[str, StreamNumber]) -> bool:
if line == "exit -s":
raise SkipCleanupExit
if line in {"quit", "exit"}:
raise EOFError
if not line.startswith(":"):
return False
parts = line[1:].split()
if not parts:
return True
cmd, *rest = parts
if cmd in {"h", "help"}:
print(
"commands:\n"
" :vars list stored variables\n"
" :show name [-f] preview digits for a variable; use -f to print full value\n"
" :save name path write the digits for a variable to a file (relative to cwd)\n"
" :free name free a variable and delete its staged file\n"
" :purge free all variables and clear logs directory\n"
" :cleanmode [mode] set cleanup mode to 'auto' or 'manual' (default manual)\n"
" :stats [path] summarize file counts and bytes (default: ./instance)\n"
" :tracked show tracked file counts from sqlite\n"
" :active show in-memory StreamNumber handles\n"
" :reload reload the sqlite tracker without clearing files\n"
" :help this message\n"
" :clear / :cls clear the terminal display\n"
" exit/quit leave the REPL"
)
elif cmd == "vars":
if not env:
print("(no variables)")
else:
for name, number in env.items():
print(f"{name:>8} -> {number.path}")
elif cmd == "show":
if not rest:
print("usage: :show name [-f]")
else:
name = rest[0]
force = "-f" in rest[1:]
number = env.get(name)
if number is None:
print(f"[warn] no variable named {name!r}")
else:
if force:
digits, _ = _collect_digits(number, limit=None)
print(f"{name} = {digits}")
else:
preview, truncated = _collect_digits(number, limit=100)
if truncated:
print(
f"[warn] {name} exceeds 100 digits. "
"Use ':show name -f' to print the full value."
)
else:
print(f"{name} = {preview}")
elif cmd == "save":
if len(rest) != 2:
print("usage: :save name path")
else:
name, filename = rest
_save_variable(env, name, filename)
elif cmd == "free":
if not rest:
print("usage: :free name")
else:
name = rest[0]
number = env.pop(name, None)
if number is None:
print(f"[warn] no variable named {name!r}")
else:
free_stream(number)
print(f"[ok] freed {name}")
elif cmd == "purge":
for number in env.values():
free_stream(number)
env.clear()
clear_logs()
print("[ok] cleared environment and staging directory")
elif cmd == "cleanmode":
if not rest:
mode = _cleanup_mode_label(manual_free_only_enabled())
print(f"[info] cleanup mode is {mode}")
elif len(rest) != 1 or rest[0].lower() not in {"auto", "manual"}:
print("usage: :cleanmode [auto|manual]")
else:
target_mode = rest[0].lower()
manual = target_mode == "manual"
set_manual_free_only(manual)
print(f"[ok] cleanup mode set to {_cleanup_mode_label(manual)}")
elif cmd == "stats":
target = Path(rest[0]).expanduser() if rest else Path("instance")
info = instance_stats(target)
print(f"[stats] root={info['root']}")
if not info["exists"]:
print(" (directory missing)")
else:
total_files = info["total_files"]
total_bytes = info["total_bytes"]
log_files = info["log_files"]
log_bytes = info["log_bytes"]
print(f" files: {total_files} total (log/: {log_files})")
print(f" bytes: {total_bytes} total (log/: {log_bytes})")
elif cmd in {"clear", "cls"}:
_clear_screen()
elif cmd == "tracked":
tracked = tracked_files()
if not tracked:
print("(no tracked files)")
else:
for path, count in tracked.items():
print(f"{path} (ref_count={count})")
elif cmd == "active":
active = active_streams()
if not active:
print("(no active streams)")
else:
for path, count in active.items():
print(f"{path} (python_refs={count})")
elif cmd == "reload":
print("tracker reload not implemented yet.")
else:
print(f"[warn] unknown command :{cmd}")
return True
def _evaluate_line(line: str, env: Dict[str, StreamNumber]) -> Optional[StreamNumber]:
try:
node = ast.parse(line, mode="exec")
except SyntaxError:
expr_node = ast.parse(line, mode="eval")
result = _eval_expr(expr_node.body, env)
return result
if len(node.body) != 1 or not isinstance(node.body[0], ast.Assign):
raise SyntaxError("only single assignment statements are allowed (e.g. x = 2 + 2)")
assign = node.body[0]
if len(assign.targets) != 1 or not isinstance(assign.targets[0], ast.Name):
raise SyntaxError("left-hand side must be a simple variable name")
target = assign.targets[0].id
result = _eval_expr(assign.value, env)
previous = env.pop(target, None)
if previous is not None:
free_stream(previous)
env[target] = result
print(f"[set] {target} = {result.path}")
return None
def _eval_expr(node: ast.AST, env: Dict[str, StreamNumber]) -> StreamNumber:
if isinstance(node, ast.BinOp):
left = _eval_expr(node.left, env)
right = _eval_expr(node.right, env)
if isinstance(node.op, ast.Add):
return add(left, right)
if isinstance(node.op, ast.Sub):
return sub(left, right)
if isinstance(node.op, ast.Mult):
return mul(left, right)
if isinstance(node.op, ast.Div):
return div(left, right)
if isinstance(node.op, ast.Mod):
return mod(left, right)
if isinstance(node.op, ast.Pow):
return pow(left, right)
raise TypeError(f"unsupported operator {ast.dump(node.op)}")
if isinstance(node, ast.UnaryOp):
operand = _eval_expr(node.operand, env)
if isinstance(node.op, ast.UAdd):
return operand
if isinstance(node.op, ast.USub):
zero = StreamNumber(literal="0")
result = sub(zero, operand)
zero.free()
return result
raise TypeError("only +x and -x unary operators are supported")
if isinstance(node, ast.Name):
if node.id not in env:
raise NameError(f"{node.id!r} is not defined")
return env[node.id]
if isinstance(node, ast.Constant):
value = node.value
if isinstance(value, bool):
raise TypeError("boolean literals are not supported")
if isinstance(value, int):
return StreamNumber(literal=str(value))
if isinstance(value, str):
return _coerce_literal_or_path(value)
raise TypeError(f"unsupported literal type {type(value)!r}")
if isinstance(node, ast.Call):
raise TypeError("function calls are not allowed in expressions")
raise TypeError(f"unsupported syntax: {ast.dump(node)}")
def _coerce_literal_or_path(value: str) -> StreamNumber:
candidate = Path(value)
if candidate.exists():
return StreamNumber(candidate)
return StreamNumber(literal=value)
def _preview_digits(number: StreamNumber, limit: int) -> str:
remaining = max(0, limit)
pieces: list[str] = []
for chunk in number.stream():
if remaining <= 0:
break
slice_chunk = chunk[:remaining]
pieces.append(slice_chunk)
remaining -= len(slice_chunk)
return "".join(pieces) or "0"
def _has_more_digits(number: StreamNumber, limit: int) -> bool:
total = 0
for chunk in number.stream():
total += len(chunk)
if total > limit:
return True
return False
def _save_variable(env: Dict[str, StreamNumber], name: str, filename: str) -> None:
number = env.get(name)
if number is None:
print(f"[warn] no variable named {name!r}")
return
digits, _ = _collect_digits(number, limit=None)
target = Path(filename)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(digits + "\n", encoding="utf-8")
print(f"[ok] saved {name} to {target.resolve()}")
def _collect_digits(number: StreamNumber, limit: Optional[int]) -> tuple[str, bool]:
remaining = limit
pieces: list[str] = []
truncated = False
for chunk in number.stream():
if remaining is not None:
if remaining <= 0:
truncated = True
break
if len(chunk) > remaining:
pieces.append(chunk[:remaining])
truncated = True
remaining = 0
break
remaining -= len(chunk)
pieces.append(chunk)
return ("".join(pieces) or "0", truncated)
def _clear_screen() -> None:
import os
os.system("cls" if os.name == "nt" else "clear")
def _wipe_instance() -> None:
from shutil import rmtree
root = Path("instance")
if root.exists():
rmtree(root, ignore_errors=True)
def _shutdown(env: Dict[str, StreamNumber], exit_code: int, skip_cleanup: bool) -> None:
for number in env.values():
free_stream(number)
env.clear()
set_manual_free_only(False)
if not skip_cleanup:
_wipe_instance()
print("bye have a great time UwU")
raise SystemExit(exit_code or 621)