from __future__ import annotations import curses import threading import time from dataclasses import dataclass from typing import Optional from mathstream import clear_logs, manual_free_only_enabled, set_manual_free_only from pi_finder.engine import iterate_nilakantha from .dashboard import PiDashboard @dataclass class SharedState: digits_text: str = "0" digits_count: int = 0 last_digit: Optional[str] = None computing: bool = False iteration: int = 0 class PiApp: """Controller coordinating the π spigot worker and curses dashboard.""" def __init__( self, stdscr: "curses._CursesWindow", *, max_digits: Optional[int], ) -> None: self.stdscr = stdscr self.dashboard = PiDashboard(stdscr) requested = max_digits if max_digits is not None else 50 self.precision = max(1, requested) self.state = SharedState() self._state_lock = threading.Lock() self._stop_event = threading.Event() self._worker: Optional[threading.Thread] = None self._start_time = time.time() def run(self) -> None: self._start_time = time.time() self._worker = threading.Thread(target=self._worker_loop, daemon=True) self._worker.start() try: self._ui_loop() finally: self._stop_event.set() if self._worker: self._worker.join() def _ui_loop(self) -> None: while True: snapshot = self._snapshot() elapsed = time.time() - self._start_time rate = snapshot.digits_count / elapsed if elapsed > 0 else 0.0 self.dashboard.render( snapshot.digits_text, snapshot.digits_count, elapsed, rate, snapshot.last_digit, snapshot.computing, snapshot.iteration, ) ch = self.stdscr.getch() if ch == ord("q"): self._stop_event.set() break if ch != -1: self.dashboard.handle_input(ch) if self._stop_event.is_set(): break time.sleep(0.05) def _worker_loop(self) -> None: previous_manual = manual_free_only_enabled() set_manual_free_only(True) clear_logs() generator = None try: generator = iterate_nilakantha( iterations=None, digits=self.precision, extra_precision=4, ) for state in generator: if self._stop_event.is_set(): break with self._state_lock: self.state.computing = True calc_start = time.time() digits_text = state.digits calc_duration = time.time() - calc_start numeric_chars = [ch for ch in digits_text if ch.isdigit()] last_digit = numeric_chars[-1] if numeric_chars else None with self._state_lock: self.state.digits_text = digits_text self.state.digits_count = len(numeric_chars) self.state.last_digit = last_digit self.state.iteration = state.iteration self.state.computing = False if calc_duration < 0.05: time.sleep(0.05) finally: if generator is not None: generator.close() set_manual_free_only(previous_manual) with self._state_lock: self.state.computing = False def _snapshot(self) -> SharedState: with self._state_lock: return SharedState( digits_text=self.state.digits_text, digits_count=self.state.digits_count, last_digit=self.state.last_digit, computing=self.state.computing, iteration=self.state.iteration, )