From 88ee579114c4d5b573c1446458943c9f8f2ea1d2 Mon Sep 17 00:00:00 2001 From: Dominik Krenn Date: Wed, 5 Nov 2025 12:20:31 +0100 Subject: [PATCH] added find pi --- find_my.py | 140 +++++++++++++++++++++++++++++++++++++ pi_finder/__init__.py | 9 +++ pi_finder/engine.py | 142 ++++++++++++++++++++++++++++++++++++++ pi_finder/ui/__init__.py | 18 +++++ pi_finder/ui/app.py | 132 +++++++++++++++++++++++++++++++++++ pi_finder/ui/colors.py | 38 ++++++++++ pi_finder/ui/dashboard.py | 58 ++++++++++++++++ pi_finder/ui/views.py | 95 +++++++++++++++++++++++++ 8 files changed, 632 insertions(+) create mode 100644 find_my.py create mode 100644 pi_finder/__init__.py create mode 100644 pi_finder/engine.py create mode 100644 pi_finder/ui/__init__.py create mode 100644 pi_finder/ui/app.py create mode 100644 pi_finder/ui/colors.py create mode 100644 pi_finder/ui/dashboard.py create mode 100644 pi_finder/ui/views.py diff --git a/find_my.py b/find_my.py new file mode 100644 index 0000000..02081e6 --- /dev/null +++ b/find_my.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 + +"""Pi explorer powered by mathstream. + +Computes π using the Nilakantha series with streamed big-int arithmetic and +optionally renders the progress in a curses dashboard (`--ui`). +""" + +from __future__ import annotations + +import argparse +from pathlib import Path +from typing import Iterable, Optional + +from mathstream import ( + clear_logs, + manual_free_only_enabled, + set_manual_free_only, +) + +from pi_finder import iterate_nilakantha + + +def run_cli_mode( + *, + digits: int, + iterations: int, + extra_precision: int, + show_steps: bool, +) -> tuple[str, int]: + previous_manual = manual_free_only_enabled() + set_manual_free_only(True) + clear_logs() + + final_value: Optional[str] = None + last_iteration = 0 + try: + for state in iterate_nilakantha( + iterations, + digits=digits, + extra_precision=extra_precision, + ): + final_value = state.digits + last_iteration = state.iteration + if show_steps: + print(f"[iter {state.iteration:02d}] {state.digits}") + + if final_value is None: + final_value = "0" + last_iteration = 0 + if show_steps: + print(final_value) + finally: + set_manual_free_only(previous_manual) + + return final_value, last_iteration + + +def run_ui_mode(max_digits: Optional[int]) -> None: + try: + from pi_finder.ui import launch_pi_ui + except ImportError as exc: # pragma: no cover - import guard + raise SystemExit(f"pi UI is unavailable: {exc}") from exc + launch_pi_ui(max_digits=max_digits) + + +def parse_args(argv: Optional[Iterable[str]] = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Nilakantha-based π explorer using mathstream streamed arithmetic." + ) + parser.add_argument( + "--digits", + type=int, + default=25, + help="Digits of π to keep after the decimal point (default: 25).", + ) + parser.add_argument( + "--iterations", + type=int, + default=10, + help="Nilakantha iterations to run (each adds roughly two decimal digits).", + ) + parser.add_argument( + "--extra", + type=int, + default=4, + help="Extra guard digits maintained during calculation for stability (default: 4).", + ) + parser.add_argument( + "--show-steps", + action="store_true", + help="Print every intermediate approximation instead of just the final value.", + ) + parser.add_argument( + "--output", + type=Path, + default=Path("found.pi"), + help="Optional path to write the final approximation (default: found.pi).", + ) + parser.add_argument( + "--ui", + action="store_true", + help="Launch the curses dashboard instead of printing digits.", + ) + parser.add_argument( + "--max-digits", + type=int, + default=None, + help="Limit digits in UI mode (defaults to --digits).", + ) + return parser.parse_args(list(argv) if argv is not None else None) + + +def main(argv: Optional[Iterable[str]] = None) -> None: + args = parse_args(argv) + if args.ui: + precision = args.max_digits if args.max_digits is not None else max(10, args.digits) + run_ui_mode(precision) + else: + digits = max(0, args.digits) + iterations = max(0, args.iterations) + extra = max(0, args.extra) + result, last_iter = run_cli_mode( + digits=digits, + iterations=iterations, + extra_precision=extra, + show_steps=args.show_steps, + ) + output_path: Path = args.output + output_path.parent.mkdir(parents=True, exist_ok=True) + data = result if result.endswith("\n") else f"{result}\n" + output_path.write_text(data, encoding="utf-8") + digits_written = sum(ch.isdigit() for ch in result) + print( + f"π approximation (iteration {last_iter}, {digits_written} digits) written to {output_path}" + ) + + +if __name__ == "__main__": + main() diff --git a/pi_finder/__init__.py b/pi_finder/__init__.py new file mode 100644 index 0000000..6d24213 --- /dev/null +++ b/pi_finder/__init__.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from .engine import PiApproximation, compute_pi_text, iterate_nilakantha + +__all__ = [ + "PiApproximation", + "iterate_nilakantha", + "compute_pi_text", +] diff --git a/pi_finder/engine.py b/pi_finder/engine.py new file mode 100644 index 0000000..5d3e2f8 --- /dev/null +++ b/pi_finder/engine.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import itertools +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, Iterator + +from mathstream import StreamNumber, add, sub, mul, div +from mathstream.number import LOG_DIR + + +@dataclass +class PiApproximation: + iteration: int + digits: str + + +def _literal(value: int | str) -> StreamNumber: + return StreamNumber(literal=str(value)) + + +_SCALE_CACHE: dict[int, Path] = {} + + +def _scaled_one(scale_power: int) -> StreamNumber: + if scale_power < 0: + raise ValueError("scale_power must be non-negative") + cached = _SCALE_CACHE.get(scale_power) + if cached and cached.exists(): + return StreamNumber(cached) + + LOG_DIR.mkdir(parents=True, exist_ok=True) + path = LOG_DIR / f"scale_1e{scale_power}.txt" + if not path.exists(): + with open(path, "w", encoding="utf-8") as handle: + handle.write("1") + remaining = scale_power + if remaining > 0: + chunk_size = 1_000_000 + zero_chunk = "0" * chunk_size + while remaining > 0: + write_size = min(chunk_size, remaining) + handle.write(zero_chunk[:write_size]) + remaining -= write_size + _SCALE_CACHE[scale_power] = path + return StreamNumber(path) + + +def _format_scaled(value: StreamNumber, scale_power: int, digits: int) -> str: + raw = "".join(value.stream()) + negative = raw.startswith("-") + if negative: + raw = raw[1:] + if len(raw) <= scale_power: + raw = raw.zfill(scale_power + 1) + integer_part = raw[:-scale_power] or "0" + fractional_part = raw[-scale_power:] if scale_power else "" + if digits >= 0 and fractional_part: + fractional_part = fractional_part[:digits] + text = integer_part + if digits != 0: + text = f"{integer_part}.{fractional_part or '0'}" + if negative: + text = f"-{text}" + return text + + +def iterate_nilakantha( + iterations: int | None, + *, + digits: int, + extra_precision: int = 4, +) -> Iterator[PiApproximation]: + """Yield successive Nilakantha approximations of π using mathstream primitives.""" + if iterations is not None and iterations < 0: + raise ValueError("iterations must be non-negative") + if digits < 0: + raise ValueError("digits must be non-negative") + + scale_power = digits + extra_precision + scale = _scaled_one(scale_power) + three = _literal(3) + four = _literal(4) + + total = mul(three, scale) + numerator = mul(four, scale) + + try: + yield PiApproximation(0, _format_scaled(total, scale_power, digits)) + + counter: Iterable[int] + if iterations is None: + counter = itertools.count(1) + else: + counter = range(1, iterations + 1) + + for idx in counter: + two_n = _literal(2 * idx) + two_n_plus_one = _literal(2 * idx + 1) + two_n_plus_two = _literal(2 * idx + 2) + + product = mul(two_n, two_n_plus_one) + denominator = mul(product, two_n_plus_two) + term = div(numerator, denominator) + + updated = add(total, term) if idx % 2 else sub(total, term) + + total.free() + total = updated + + yield PiApproximation(idx, _format_scaled(total, scale_power, digits)) + + for number in ( + two_n, + two_n_plus_one, + two_n_plus_two, + product, + denominator, + term, + ): + number.free() + + finally: + for number in (total, numerator, scale, three, four): + number.free() + + +def compute_pi_text( + iterations: int, + *, + digits: int, + extra_precision: int = 4, +) -> str: + """Return the final Nilakantha approximation after the requested iterations.""" + approximation: str = "0" + for state in iterate_nilakantha( + iterations, + digits=digits, + extra_precision=extra_precision, + ): + approximation = state.digits + return approximation diff --git a/pi_finder/ui/__init__.py b/pi_finder/ui/__init__.py new file mode 100644 index 0000000..c164db0 --- /dev/null +++ b/pi_finder/ui/__init__.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +import curses +from functools import partial +from typing import Optional + +from .app import PiApp + + +def launch_pi_ui(max_digits: Optional[int] = None) -> None: + """Launch the curses dashboard for streaming π digits.""" + wrapper = partial(_run_curses_app, max_digits=max_digits) + curses.wrapper(wrapper) # type: ignore[arg-type] + + +def _run_curses_app(stdscr: "curses._CursesWindow", max_digits: Optional[int]) -> None: + app = PiApp(stdscr, max_digits=max_digits) + app.run() diff --git a/pi_finder/ui/app.py b/pi_finder/ui/app.py new file mode 100644 index 0000000..483aefb --- /dev/null +++ b/pi_finder/ui/app.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import curses +import threading +import time +from dataclasses import dataclass +from typing import Optional + +from mathstream import clear_logs, manual_free_only_enabled, set_manual_free_only + +from pi_finder.engine import iterate_nilakantha + +from .dashboard import PiDashboard + + +@dataclass +class SharedState: + digits_text: str = "0" + digits_count: int = 0 + last_digit: Optional[str] = None + computing: bool = False + iteration: int = 0 + + +class PiApp: + """Controller coordinating the π spigot worker and curses dashboard.""" + + def __init__( + self, + stdscr: "curses._CursesWindow", + *, + max_digits: Optional[int], + ) -> None: + self.stdscr = stdscr + self.dashboard = PiDashboard(stdscr) + requested = max_digits if max_digits is not None else 50 + self.precision = max(1, requested) + + self.state = SharedState() + self._state_lock = threading.Lock() + self._stop_event = threading.Event() + self._worker: Optional[threading.Thread] = None + self._start_time = time.time() + + def run(self) -> None: + self._start_time = time.time() + self._worker = threading.Thread(target=self._worker_loop, daemon=True) + self._worker.start() + try: + self._ui_loop() + finally: + self._stop_event.set() + if self._worker: + self._worker.join() + + def _ui_loop(self) -> None: + while True: + snapshot = self._snapshot() + elapsed = time.time() - self._start_time + rate = snapshot.digits_count / elapsed if elapsed > 0 else 0.0 + self.dashboard.render( + snapshot.digits_text, + snapshot.digits_count, + elapsed, + rate, + snapshot.last_digit, + snapshot.computing, + snapshot.iteration, + ) + + ch = self.stdscr.getch() + if ch == ord("q"): + self._stop_event.set() + break + if ch != -1: + self.dashboard.handle_input(ch) + if self._stop_event.is_set(): + break + time.sleep(0.05) + + def _worker_loop(self) -> None: + previous_manual = manual_free_only_enabled() + set_manual_free_only(True) + clear_logs() + generator = None + try: + generator = iterate_nilakantha( + iterations=None, + digits=self.precision, + extra_precision=4, + ) + + for state in generator: + if self._stop_event.is_set(): + break + + with self._state_lock: + self.state.computing = True + + calc_start = time.time() + digits_text = state.digits + calc_duration = time.time() - calc_start + + numeric_chars = [ch for ch in digits_text if ch.isdigit()] + last_digit = numeric_chars[-1] if numeric_chars else None + + with self._state_lock: + self.state.digits_text = digits_text + self.state.digits_count = len(numeric_chars) + self.state.last_digit = last_digit + self.state.iteration = state.iteration + self.state.computing = False + + if calc_duration < 0.05: + time.sleep(0.05) + finally: + if generator is not None: + generator.close() + set_manual_free_only(previous_manual) + + with self._state_lock: + self.state.computing = False + + def _snapshot(self) -> SharedState: + with self._state_lock: + return SharedState( + digits_text=self.state.digits_text, + digits_count=self.state.digits_count, + last_digit=self.state.last_digit, + computing=self.state.computing, + iteration=self.state.iteration, + ) diff --git a/pi_finder/ui/colors.py b/pi_finder/ui/colors.py new file mode 100644 index 0000000..4e19420 --- /dev/null +++ b/pi_finder/ui/colors.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import curses + + +class UIColors: + """Shared color palette for the π dashboard.""" + + HEADER = 1 + SEPARATOR = 2 + NUMBER = 3 + STATUS = 4 + WARNING = 5 + SUCCESS = 6 + METRIC = 7 + + @classmethod + def setup(cls) -> None: + if not curses.has_colors(): + return + curses.start_color() + curses.use_default_colors() + curses.init_pair(cls.HEADER, curses.COLOR_CYAN, -1) + curses.init_pair(cls.SEPARATOR, curses.COLOR_BLUE, -1) + curses.init_pair(cls.NUMBER, curses.COLOR_WHITE, -1) + curses.init_pair(cls.STATUS, curses.COLOR_MAGENTA, -1) + curses.init_pair(cls.WARNING, curses.COLOR_RED, -1) + curses.init_pair(cls.SUCCESS, curses.COLOR_GREEN, -1) + curses.init_pair(cls.METRIC, curses.COLOR_YELLOW, -1) + + @staticmethod + def style(pair: int, *, bold: bool = False, dim: bool = False) -> int: + attr = curses.color_pair(pair) + if bold: + attr |= curses.A_BOLD + if dim: + attr |= curses.A_DIM + return attr diff --git a/pi_finder/ui/dashboard.py b/pi_finder/ui/dashboard.py new file mode 100644 index 0000000..0a203d9 --- /dev/null +++ b/pi_finder/ui/dashboard.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import curses + +from .colors import UIColors +from .views import DigitsView, HeaderView, HelpView + + +class PiDashboard: + """Coordinates curses windows for the π streaming UI.""" + + def __init__(self, stdscr: "curses._CursesWindow") -> None: + self.stdscr = stdscr + try: + curses.curs_set(0) + except curses.error: + pass + self.stdscr.nodelay(True) + self.stdscr.timeout(100) + UIColors.setup() + + lines, cols = curses.LINES, curses.COLS + header_h = 6 + help_h = 1 + digits_h = max(1, lines - header_h - help_h - 1) + + self.header = HeaderView(header_h, cols, 0, 0) + self.digits = DigitsView(digits_h, cols, header_h, 0) + self.help = HelpView(header_h + digits_h, 0, cols) + self.scroll = 0 + + def render( + self, + digits_text: str, + digits_count: int, + elapsed: float, + rate: float, + last_digit: str | None, + computing: bool, + iteration: int, + ) -> None: + self.header.render( + iteration, + digits_count, + elapsed, + rate, + last_digit, + computing, + ) + self.scroll = self.digits.render(digits_text, self.scroll) + self.help.render() + curses.doupdate() + + def handle_input(self, ch: int) -> None: + if ch == curses.KEY_UP: + self.scroll = max(0, self.scroll - 1) + elif ch == curses.KEY_DOWN: + self.scroll += 1 diff --git a/pi_finder/ui/views.py b/pi_finder/ui/views.py new file mode 100644 index 0000000..d19a8dd --- /dev/null +++ b/pi_finder/ui/views.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import curses +from typing import Optional + +from .colors import UIColors + + +class HeaderView: + """Top panel with live metrics.""" + + def __init__(self, height: int, width: int, y: int, x: int) -> None: + self.win = curses.newwin(height, width, y, x) + + def render( + self, + iteration: int, + digits: int, + elapsed: float, + rate: float, + last_digit: Optional[str], + computing: bool, + ) -> None: + self.win.erase() + style = UIColors.style(UIColors.HEADER, bold=True) + metric_style = UIColors.style(UIColors.METRIC, bold=True) + status_style = UIColors.style(UIColors.STATUS) + + lines = [ + " π Stream Dashboard ", + f" Iteration: {iteration:,}", + f" Digits: {digits:,}", + f" Elapsed: {elapsed:8.2f}s • Rate: {rate:8.2f} digits/s", + ] + status = " Generating…" if computing else " Idle" + digit_line = f" Last digit: {last_digit if last_digit is not None else '—'}" + + try: + self.win.addstr(0, 0, lines[0], style) + self.win.addstr(1, 0, lines[1], metric_style) + self.win.addstr(2, 0, lines[2], metric_style) + self.win.addstr(3, 0, lines[3], metric_style) + self.win.addstr(4, 0, digit_line, metric_style) + self.win.addstr(5, 0, status, status_style) + width = max(1, self.win.getmaxyx()[1] - 1) + self.win.addstr(6, 0, "─" * width, UIColors.style(UIColors.SEPARATOR)) + except curses.error: + pass + + self.win.noutrefresh() + + +class DigitsView: + """Scrollable window for the π digits.""" + + def __init__(self, height: int, width: int, y: int, x: int) -> None: + self.win = curses.newwin(height, width, y, x) + + def render(self, digits_text: str, scroll: int) -> int: + self.win.erase() + max_y, max_x = self.win.getmaxyx() + if max_y <= 0 or max_x <= 0: + self.win.noutrefresh() + return 0 + + cols = max(1, max_x - 1) + chunks = [digits_text[i : i + cols] for i in range(0, len(digits_text), cols)] or ["0"] + + max_scroll = max(0, len(chunks) - max_y) + scroll = max(0, min(scroll, max_scroll)) + style = UIColors.style(UIColors.NUMBER) + + for idx, chunk in enumerate(chunks[scroll : scroll + max_y]): + try: + self.win.addstr(idx, 0, chunk, style) + except curses.error: + break + + self.win.noutrefresh() + return scroll + + +class HelpView: + """Bottom row with key hints.""" + + def __init__(self, y: int, x: int, width: int) -> None: + self.win = curses.newwin(1, width, y, x) + + def render(self) -> None: + message = " ↑↓ scroll • q quit" + try: + self.win.addstr(0, 0, message, UIColors.style(UIColors.STATUS, dim=True)) + except curses.error: + pass + self.win.noutrefresh()