from __future__ import annotations import argparse import ast from pathlib import Path from typing import Dict, Optional class SkipCleanupExit(Exception): """Signal that the REPL should exit without wiping the staging directory.""" from . import ( StreamNumber, add, sub, mul, div, mod, pow, free_stream, clear_logs, tracked_files, active_streams, set_manual_free_only, manual_free_only_enabled, instance_stats, ) Prompt = "mathstream> " def _cleanup_mode_label(manual_only: bool) -> str: return "manual (auto cleanup off)" if manual_only else "auto (auto cleanup on)" def run(argv: Optional[list[str]] = None) -> None: parser = argparse.ArgumentParser( description="Interactive shell for mathstream streamed arithmetic." ) parser.add_argument( "--cleanmode", choices=("auto", "manual"), default="manual", help=( "Choose how staged files are cleaned up: 'auto' deletes them when streams " "fall out of scope; 'manual' keeps them until freed or purged." ), ) parser.add_argument( "--manual-free", dest="deprecated_manual_free", action="store_true", help=argparse.SUPPRESS, ) parser.add_argument( "--preview", type=int, default=80, help="Digits to preview when printing results (default: 80).", ) args = parser.parse_args(argv) manual_mode = args.cleanmode == "manual" if getattr(args, "deprecated_manual_free", False): manual_mode = True print("[warn] --manual-free is deprecated; use '--cleanmode manual' instead.") set_manual_free_only(manual_mode) env: Dict[str, StreamNumber] = {} _greetings(manual_mode) exit_code = 69 skip_cleanup = False while True: try: line = input(Prompt) except EOFError: print() exit_code = 42 break except KeyboardInterrupt: print() continue stripped = line.strip() if not stripped: continue if stripped.startswith("save "): parts = stripped.split(maxsplit=2) if len(parts) != 3: print("usage: save name filename") else: _, name, filename = parts _save_variable(env, name, filename) continue try: if _handle_command(stripped, env): continue except SkipCleanupExit: skip_cleanup = True exit_code = 0 break except EOFError: exit_code = 621 break try: result = _evaluate_line(line, env) except Exception as exc: print(f"[error] {exc}") continue if result is None: continue env["_"] = result preview = _preview_digits(result, args.preview) digits_count = sum(1 for ch in preview if ch.isdigit()) print(f"[result] {result.path} ({digits_count} digits previewed)") print( f" {preview}" f"{'…' if _has_more_digits(result, args.preview) else ''}" ) _shutdown(env, exit_code, skip_cleanup) def _greetings(manual_only: bool) -> None: print("mathstream REPL ready. type ':help' for commands.") print(f"cleanup mode: {_cleanup_mode_label(manual_only)}") def _handle_command(line: str, env: Dict[str, StreamNumber]) -> bool: if line == "exit -s": raise SkipCleanupExit if line in {"quit", "exit"}: raise EOFError if not line.startswith(":"): return False parts = line[1:].split() if not parts: return True cmd, *rest = parts if cmd in {"h", "help"}: print( "commands:\n" " :vars list stored variables\n" " :show name [-f] preview digits for a variable; use -f to print full value\n" " :save name path write the digits for a variable to a file (relative to cwd)\n" " :free name free a variable and delete its staged file\n" " :purge free all variables and clear logs directory\n" " :cleanmode [mode] set cleanup mode to 'auto' or 'manual' (default manual)\n" " :stats [path] summarize file counts and bytes (default: ./instance)\n" " :tracked show tracked file counts from sqlite\n" " :active show in-memory StreamNumber handles\n" " :reload reload the sqlite tracker without clearing files\n" " :help this message\n" " :clear / :cls clear the terminal display\n" " exit/quit leave the REPL" ) elif cmd == "vars": if not env: print("(no variables)") else: for name, number in env.items(): print(f"{name:>8} -> {number.path}") elif cmd == "show": if not rest: print("usage: :show name [-f]") else: name = rest[0] force = "-f" in rest[1:] number = env.get(name) if number is None: print(f"[warn] no variable named {name!r}") else: if force: digits, _ = _collect_digits(number, limit=None) print(f"{name} = {digits}") else: preview, truncated = _collect_digits(number, limit=100) if truncated: print( f"[warn] {name} exceeds 100 digits. " "Use ':show name -f' to print the full value." ) else: print(f"{name} = {preview}") elif cmd == "save": if len(rest) != 2: print("usage: :save name path") else: name, filename = rest _save_variable(env, name, filename) elif cmd == "free": if not rest: print("usage: :free name") else: name = rest[0] number = env.pop(name, None) if number is None: print(f"[warn] no variable named {name!r}") else: free_stream(number) print(f"[ok] freed {name}") elif cmd == "purge": for number in env.values(): free_stream(number) env.clear() clear_logs() print("[ok] cleared environment and staging directory") elif cmd == "cleanmode": if not rest: mode = _cleanup_mode_label(manual_free_only_enabled()) print(f"[info] cleanup mode is {mode}") elif len(rest) != 1 or rest[0].lower() not in {"auto", "manual"}: print("usage: :cleanmode [auto|manual]") else: target_mode = rest[0].lower() manual = target_mode == "manual" set_manual_free_only(manual) print(f"[ok] cleanup mode set to {_cleanup_mode_label(manual)}") elif cmd == "stats": target = Path(rest[0]).expanduser() if rest else Path("instance") info = instance_stats(target) print(f"[stats] root={info['root']}") if not info["exists"]: print(" (directory missing)") else: total_files = info["total_files"] total_bytes = info["total_bytes"] log_files = info["log_files"] log_bytes = info["log_bytes"] print(f" files: {total_files} total (log/: {log_files})") print(f" bytes: {total_bytes} total (log/: {log_bytes})") elif cmd in {"clear", "cls"}: _clear_screen() elif cmd == "tracked": tracked = tracked_files() if not tracked: print("(no tracked files)") else: for path, count in tracked.items(): print(f"{path} (ref_count={count})") elif cmd == "active": active = active_streams() if not active: print("(no active streams)") else: for path, count in active.items(): print(f"{path} (python_refs={count})") elif cmd == "reload": print("tracker reload not implemented yet.") else: print(f"[warn] unknown command :{cmd}") return True def _evaluate_line(line: str, env: Dict[str, StreamNumber]) -> Optional[StreamNumber]: try: node = ast.parse(line, mode="exec") except SyntaxError: expr_node = ast.parse(line, mode="eval") result = _eval_expr(expr_node.body, env) return result if len(node.body) != 1 or not isinstance(node.body[0], ast.Assign): raise SyntaxError("only single assignment statements are allowed (e.g. x = 2 + 2)") assign = node.body[0] if len(assign.targets) != 1 or not isinstance(assign.targets[0], ast.Name): raise SyntaxError("left-hand side must be a simple variable name") target = assign.targets[0].id result = _eval_expr(assign.value, env) previous = env.pop(target, None) if previous is not None: free_stream(previous) env[target] = result print(f"[set] {target} = {result.path}") return None def _eval_expr(node: ast.AST, env: Dict[str, StreamNumber]) -> StreamNumber: if isinstance(node, ast.BinOp): left = _eval_expr(node.left, env) right = _eval_expr(node.right, env) if isinstance(node.op, ast.Add): return add(left, right) if isinstance(node.op, ast.Sub): return sub(left, right) if isinstance(node.op, ast.Mult): return mul(left, right) if isinstance(node.op, ast.Div): return div(left, right) if isinstance(node.op, ast.Mod): return mod(left, right) if isinstance(node.op, ast.Pow): return pow(left, right) raise TypeError(f"unsupported operator {ast.dump(node.op)}") if isinstance(node, ast.UnaryOp): operand = _eval_expr(node.operand, env) if isinstance(node.op, ast.UAdd): return operand if isinstance(node.op, ast.USub): zero = StreamNumber(literal="0") result = sub(zero, operand) zero.free() return result raise TypeError("only +x and -x unary operators are supported") if isinstance(node, ast.Name): if node.id not in env: raise NameError(f"{node.id!r} is not defined") return env[node.id] if isinstance(node, ast.Constant): value = node.value if isinstance(value, bool): raise TypeError("boolean literals are not supported") if isinstance(value, int): return StreamNumber(literal=str(value)) if isinstance(value, str): return _coerce_literal_or_path(value) raise TypeError(f"unsupported literal type {type(value)!r}") if isinstance(node, ast.Call): raise TypeError("function calls are not allowed in expressions") raise TypeError(f"unsupported syntax: {ast.dump(node)}") def _coerce_literal_or_path(value: str) -> StreamNumber: candidate = Path(value) if candidate.exists(): return StreamNumber(candidate) return StreamNumber(literal=value) def _preview_digits(number: StreamNumber, limit: int) -> str: remaining = max(0, limit) pieces: list[str] = [] for chunk in number.stream(): if remaining <= 0: break slice_chunk = chunk[:remaining] pieces.append(slice_chunk) remaining -= len(slice_chunk) return "".join(pieces) or "0" def _has_more_digits(number: StreamNumber, limit: int) -> bool: total = 0 for chunk in number.stream(): total += len(chunk) if total > limit: return True return False def _save_variable(env: Dict[str, StreamNumber], name: str, filename: str) -> None: number = env.get(name) if number is None: print(f"[warn] no variable named {name!r}") return digits, _ = _collect_digits(number, limit=None) target = Path(filename) target.parent.mkdir(parents=True, exist_ok=True) target.write_text(digits + "\n", encoding="utf-8") print(f"[ok] saved {name} to {target.resolve()}") def _collect_digits(number: StreamNumber, limit: Optional[int]) -> tuple[str, bool]: remaining = limit pieces: list[str] = [] truncated = False for chunk in number.stream(): if remaining is not None: if remaining <= 0: truncated = True break if len(chunk) > remaining: pieces.append(chunk[:remaining]) truncated = True remaining = 0 break remaining -= len(chunk) pieces.append(chunk) return ("".join(pieces) or "0", truncated) def _clear_screen() -> None: import os os.system("cls" if os.name == "nt" else "clear") def _wipe_instance() -> None: from shutil import rmtree root = Path("instance") if root.exists(): rmtree(root, ignore_errors=True) def _shutdown(env: Dict[str, StreamNumber], exit_code: int, skip_cleanup: bool) -> None: for number in env.values(): free_stream(number) env.clear() set_manual_free_only(False) if not skip_cleanup: _wipe_instance() print("bye have a great time UwU") raise SystemExit(exit_code or 621)