422 lines
13 KiB
Python
422 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ast
|
|
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
|
|
|
|
class SkipCleanupExit(Exception):
|
|
"""Signal that the REPL should exit without wiping the staging directory."""
|
|
|
|
from . import (
|
|
StreamNumber,
|
|
add,
|
|
sub,
|
|
mul,
|
|
div,
|
|
mod,
|
|
pow,
|
|
free_stream,
|
|
clear_logs,
|
|
tracked_files,
|
|
active_streams,
|
|
set_manual_free_only,
|
|
manual_free_only_enabled,
|
|
instance_stats,
|
|
)
|
|
|
|
Prompt = "mathstream> "
|
|
|
|
|
|
def _cleanup_mode_label(manual_only: bool) -> str:
|
|
return "manual (auto cleanup off)" if manual_only else "auto (auto cleanup on)"
|
|
|
|
|
|
def run(argv: Optional[list[str]] = None) -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Interactive shell for mathstream streamed arithmetic."
|
|
)
|
|
parser.add_argument(
|
|
"--cleanmode",
|
|
choices=("auto", "manual"),
|
|
default="manual",
|
|
help=(
|
|
"Choose how staged files are cleaned up: 'auto' deletes them when streams "
|
|
"fall out of scope; 'manual' keeps them until freed or purged."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--manual-free",
|
|
dest="deprecated_manual_free",
|
|
action="store_true",
|
|
help=argparse.SUPPRESS,
|
|
)
|
|
parser.add_argument(
|
|
"--preview",
|
|
type=int,
|
|
default=80,
|
|
help="Digits to preview when printing results (default: 80).",
|
|
)
|
|
args = parser.parse_args(argv)
|
|
|
|
manual_mode = args.cleanmode == "manual"
|
|
if getattr(args, "deprecated_manual_free", False):
|
|
manual_mode = True
|
|
print("[warn] --manual-free is deprecated; use '--cleanmode manual' instead.")
|
|
|
|
set_manual_free_only(manual_mode)
|
|
env: Dict[str, StreamNumber] = {}
|
|
_greetings(manual_mode)
|
|
|
|
exit_code = 69
|
|
skip_cleanup = False
|
|
while True:
|
|
try:
|
|
line = input(Prompt)
|
|
except EOFError:
|
|
print()
|
|
exit_code = 42
|
|
break
|
|
except KeyboardInterrupt:
|
|
print()
|
|
continue
|
|
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
|
|
if stripped.startswith("save "):
|
|
parts = stripped.split(maxsplit=2)
|
|
if len(parts) != 3:
|
|
print("usage: save name filename")
|
|
else:
|
|
_, name, filename = parts
|
|
_save_variable(env, name, filename)
|
|
continue
|
|
|
|
try:
|
|
if _handle_command(stripped, env):
|
|
continue
|
|
except SkipCleanupExit:
|
|
skip_cleanup = True
|
|
exit_code = 0
|
|
break
|
|
except EOFError:
|
|
exit_code = 621
|
|
break
|
|
|
|
try:
|
|
result = _evaluate_line(line, env)
|
|
except Exception as exc:
|
|
print(f"[error] {exc}")
|
|
continue
|
|
|
|
if result is None:
|
|
continue
|
|
|
|
env["_"] = result
|
|
preview = _preview_digits(result, args.preview)
|
|
digits_count = sum(1 for ch in preview if ch.isdigit())
|
|
print(f"[result] {result.path} ({digits_count} digits previewed)")
|
|
print(
|
|
f" {preview}"
|
|
f"{'…' if _has_more_digits(result, args.preview) else ''}"
|
|
)
|
|
|
|
_shutdown(env, exit_code, skip_cleanup)
|
|
|
|
|
|
def _greetings(manual_only: bool) -> None:
|
|
print("mathstream REPL ready. type ':help' for commands.")
|
|
print(f"cleanup mode: {_cleanup_mode_label(manual_only)}")
|
|
|
|
|
|
def _handle_command(line: str, env: Dict[str, StreamNumber]) -> bool:
|
|
if line == "exit -s":
|
|
raise SkipCleanupExit
|
|
if line in {"quit", "exit"}:
|
|
raise EOFError
|
|
if not line.startswith(":"):
|
|
return False
|
|
|
|
parts = line[1:].split()
|
|
if not parts:
|
|
return True
|
|
|
|
cmd, *rest = parts
|
|
if cmd in {"h", "help"}:
|
|
print(
|
|
"commands:\n"
|
|
" :vars list stored variables\n"
|
|
" :show name [-f] preview digits for a variable; use -f to print full value\n"
|
|
" :save name path write the digits for a variable to a file (relative to cwd)\n"
|
|
" :free name free a variable and delete its staged file\n"
|
|
" :purge free all variables and clear logs directory\n"
|
|
" :cleanmode [mode] set cleanup mode to 'auto' or 'manual' (default manual)\n"
|
|
" :stats [path] summarize file counts and bytes (default: ./instance)\n"
|
|
" :tracked show tracked file counts from sqlite\n"
|
|
" :active show in-memory StreamNumber handles\n"
|
|
" :reload reload the sqlite tracker without clearing files\n"
|
|
" :help this message\n"
|
|
" :clear / :cls clear the terminal display\n"
|
|
" exit/quit leave the REPL"
|
|
)
|
|
elif cmd == "vars":
|
|
if not env:
|
|
print("(no variables)")
|
|
else:
|
|
for name, number in env.items():
|
|
print(f"{name:>8} -> {number.path}")
|
|
elif cmd == "show":
|
|
if not rest:
|
|
print("usage: :show name [-f]")
|
|
else:
|
|
name = rest[0]
|
|
force = "-f" in rest[1:]
|
|
number = env.get(name)
|
|
if number is None:
|
|
print(f"[warn] no variable named {name!r}")
|
|
else:
|
|
if force:
|
|
digits, _ = _collect_digits(number, limit=None)
|
|
print(f"{name} = {digits}")
|
|
else:
|
|
preview, truncated = _collect_digits(number, limit=100)
|
|
if truncated:
|
|
print(
|
|
f"[warn] {name} exceeds 100 digits. "
|
|
"Use ':show name -f' to print the full value."
|
|
)
|
|
else:
|
|
print(f"{name} = {preview}")
|
|
elif cmd == "save":
|
|
if len(rest) != 2:
|
|
print("usage: :save name path")
|
|
else:
|
|
name, filename = rest
|
|
_save_variable(env, name, filename)
|
|
elif cmd == "free":
|
|
if not rest:
|
|
print("usage: :free name")
|
|
else:
|
|
name = rest[0]
|
|
number = env.pop(name, None)
|
|
if number is None:
|
|
print(f"[warn] no variable named {name!r}")
|
|
else:
|
|
free_stream(number)
|
|
print(f"[ok] freed {name}")
|
|
elif cmd == "purge":
|
|
for number in env.values():
|
|
free_stream(number)
|
|
env.clear()
|
|
clear_logs()
|
|
print("[ok] cleared environment and staging directory")
|
|
elif cmd == "cleanmode":
|
|
if not rest:
|
|
mode = _cleanup_mode_label(manual_free_only_enabled())
|
|
print(f"[info] cleanup mode is {mode}")
|
|
elif len(rest) != 1 or rest[0].lower() not in {"auto", "manual"}:
|
|
print("usage: :cleanmode [auto|manual]")
|
|
else:
|
|
target_mode = rest[0].lower()
|
|
manual = target_mode == "manual"
|
|
set_manual_free_only(manual)
|
|
print(f"[ok] cleanup mode set to {_cleanup_mode_label(manual)}")
|
|
elif cmd == "stats":
|
|
target = Path(rest[0]).expanduser() if rest else Path("instance")
|
|
info = instance_stats(target)
|
|
print(f"[stats] root={info['root']}")
|
|
if not info["exists"]:
|
|
print(" (directory missing)")
|
|
else:
|
|
total_files = info["total_files"]
|
|
total_bytes = info["total_bytes"]
|
|
log_files = info["log_files"]
|
|
log_bytes = info["log_bytes"]
|
|
print(f" files: {total_files} total (log/: {log_files})")
|
|
print(f" bytes: {total_bytes} total (log/: {log_bytes})")
|
|
elif cmd in {"clear", "cls"}:
|
|
_clear_screen()
|
|
elif cmd == "tracked":
|
|
tracked = tracked_files()
|
|
if not tracked:
|
|
print("(no tracked files)")
|
|
else:
|
|
for path, count in tracked.items():
|
|
print(f"{path} (ref_count={count})")
|
|
elif cmd == "active":
|
|
active = active_streams()
|
|
if not active:
|
|
print("(no active streams)")
|
|
else:
|
|
for path, count in active.items():
|
|
print(f"{path} (python_refs={count})")
|
|
elif cmd == "reload":
|
|
print("tracker reload not implemented yet.")
|
|
else:
|
|
print(f"[warn] unknown command :{cmd}")
|
|
|
|
return True
|
|
|
|
|
|
def _evaluate_line(line: str, env: Dict[str, StreamNumber]) -> Optional[StreamNumber]:
|
|
try:
|
|
node = ast.parse(line, mode="exec")
|
|
except SyntaxError:
|
|
expr_node = ast.parse(line, mode="eval")
|
|
result = _eval_expr(expr_node.body, env)
|
|
return result
|
|
|
|
if len(node.body) != 1 or not isinstance(node.body[0], ast.Assign):
|
|
raise SyntaxError("only single assignment statements are allowed (e.g. x = 2 + 2)")
|
|
|
|
assign = node.body[0]
|
|
if len(assign.targets) != 1 or not isinstance(assign.targets[0], ast.Name):
|
|
raise SyntaxError("left-hand side must be a simple variable name")
|
|
|
|
target = assign.targets[0].id
|
|
result = _eval_expr(assign.value, env)
|
|
|
|
previous = env.pop(target, None)
|
|
if previous is not None:
|
|
free_stream(previous)
|
|
|
|
env[target] = result
|
|
print(f"[set] {target} = {result.path}")
|
|
return None
|
|
|
|
|
|
def _eval_expr(node: ast.AST, env: Dict[str, StreamNumber]) -> StreamNumber:
|
|
if isinstance(node, ast.BinOp):
|
|
left = _eval_expr(node.left, env)
|
|
right = _eval_expr(node.right, env)
|
|
if isinstance(node.op, ast.Add):
|
|
return add(left, right)
|
|
if isinstance(node.op, ast.Sub):
|
|
return sub(left, right)
|
|
if isinstance(node.op, ast.Mult):
|
|
return mul(left, right)
|
|
if isinstance(node.op, ast.Div):
|
|
return div(left, right)
|
|
if isinstance(node.op, ast.Mod):
|
|
return mod(left, right)
|
|
if isinstance(node.op, ast.Pow):
|
|
return pow(left, right)
|
|
raise TypeError(f"unsupported operator {ast.dump(node.op)}")
|
|
|
|
if isinstance(node, ast.UnaryOp):
|
|
operand = _eval_expr(node.operand, env)
|
|
if isinstance(node.op, ast.UAdd):
|
|
return operand
|
|
if isinstance(node.op, ast.USub):
|
|
zero = StreamNumber(literal="0")
|
|
result = sub(zero, operand)
|
|
zero.free()
|
|
return result
|
|
raise TypeError("only +x and -x unary operators are supported")
|
|
|
|
if isinstance(node, ast.Name):
|
|
if node.id not in env:
|
|
raise NameError(f"{node.id!r} is not defined")
|
|
return env[node.id]
|
|
|
|
if isinstance(node, ast.Constant):
|
|
value = node.value
|
|
if isinstance(value, bool):
|
|
raise TypeError("boolean literals are not supported")
|
|
if isinstance(value, int):
|
|
return StreamNumber(literal=str(value))
|
|
if isinstance(value, str):
|
|
return _coerce_literal_or_path(value)
|
|
raise TypeError(f"unsupported literal type {type(value)!r}")
|
|
|
|
if isinstance(node, ast.Call):
|
|
raise TypeError("function calls are not allowed in expressions")
|
|
|
|
raise TypeError(f"unsupported syntax: {ast.dump(node)}")
|
|
|
|
|
|
def _coerce_literal_or_path(value: str) -> StreamNumber:
|
|
candidate = Path(value)
|
|
if candidate.exists():
|
|
return StreamNumber(candidate)
|
|
return StreamNumber(literal=value)
|
|
|
|
|
|
def _preview_digits(number: StreamNumber, limit: int) -> str:
|
|
remaining = max(0, limit)
|
|
pieces: list[str] = []
|
|
for chunk in number.stream():
|
|
if remaining <= 0:
|
|
break
|
|
slice_chunk = chunk[:remaining]
|
|
pieces.append(slice_chunk)
|
|
remaining -= len(slice_chunk)
|
|
return "".join(pieces) or "0"
|
|
|
|
|
|
def _has_more_digits(number: StreamNumber, limit: int) -> bool:
|
|
total = 0
|
|
for chunk in number.stream():
|
|
total += len(chunk)
|
|
if total > limit:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _save_variable(env: Dict[str, StreamNumber], name: str, filename: str) -> None:
|
|
number = env.get(name)
|
|
if number is None:
|
|
print(f"[warn] no variable named {name!r}")
|
|
return
|
|
digits, _ = _collect_digits(number, limit=None)
|
|
target = Path(filename)
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
target.write_text(digits + "\n", encoding="utf-8")
|
|
print(f"[ok] saved {name} to {target.resolve()}")
|
|
|
|
|
|
def _collect_digits(number: StreamNumber, limit: Optional[int]) -> tuple[str, bool]:
|
|
remaining = limit
|
|
pieces: list[str] = []
|
|
truncated = False
|
|
for chunk in number.stream():
|
|
if remaining is not None:
|
|
if remaining <= 0:
|
|
truncated = True
|
|
break
|
|
if len(chunk) > remaining:
|
|
pieces.append(chunk[:remaining])
|
|
truncated = True
|
|
remaining = 0
|
|
break
|
|
remaining -= len(chunk)
|
|
pieces.append(chunk)
|
|
return ("".join(pieces) or "0", truncated)
|
|
|
|
|
|
def _clear_screen() -> None:
|
|
import os
|
|
os.system("cls" if os.name == "nt" else "clear")
|
|
|
|
|
|
def _wipe_instance() -> None:
|
|
from shutil import rmtree
|
|
|
|
root = Path("instance")
|
|
if root.exists():
|
|
rmtree(root, ignore_errors=True)
|
|
|
|
|
|
def _shutdown(env: Dict[str, StreamNumber], exit_code: int, skip_cleanup: bool) -> None:
|
|
for number in env.values():
|
|
free_stream(number)
|
|
env.clear()
|
|
set_manual_free_only(False)
|
|
if not skip_cleanup:
|
|
_wipe_instance()
|
|
print("bye have a great time UwU")
|
|
raise SystemExit(exit_code or 621)
|