198 lines
5.9 KiB
Python
198 lines
5.9 KiB
Python
from __future__ import annotations
|
|
|
|
import curses
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from mathstream import StreamNumber, add, mul, div, is_even, clear_logs
|
|
|
|
from .colors import UIColors
|
|
from .dashboard import CollatzDashboard
|
|
|
|
|
|
LOG_DIR = Path("instance/log")
|
|
|
|
|
|
def collatz_step(
|
|
n: StreamNumber,
|
|
three: StreamNumber,
|
|
two: StreamNumber,
|
|
one: StreamNumber,
|
|
) -> StreamNumber:
|
|
"""Compute the next Collatz value."""
|
|
return div(n, two) if is_even(n) else add(mul(n, three), one)
|
|
|
|
|
|
@dataclass
|
|
class SharedState:
|
|
step: int = 0
|
|
digits: str = "0"
|
|
delta_sign: int = 0
|
|
computing: bool = False
|
|
calc_start_time: Optional[float] = None
|
|
last_calc_duration: float = 0.0
|
|
reached_one: bool = False
|
|
|
|
|
|
class CollatzApp:
|
|
"""Main application loop driving the Collatz visualization."""
|
|
|
|
def __init__(self, stdscr: "curses._CursesWindow") -> None:
|
|
self.stdscr = stdscr
|
|
self.dashboard = CollatzDashboard(stdscr, LOG_DIR)
|
|
self.start_time = time.time()
|
|
self.state = SharedState()
|
|
self._state_lock = threading.Lock()
|
|
self._stop_event = threading.Event()
|
|
self._worker: Optional[threading.Thread] = None
|
|
self._error: Optional[BaseException] = None
|
|
|
|
def run(self) -> None:
|
|
start_file = Path("start.txt")
|
|
if not start_file.exists():
|
|
self.dashboard.show_message(
|
|
"Missing start.txt — create one with your starting number.",
|
|
UIColors.DOWN,
|
|
)
|
|
return
|
|
|
|
clear_logs()
|
|
n = StreamNumber(start_file)
|
|
one, two, three = (StreamNumber(literal=s) for s in ("1", "2", "3"))
|
|
|
|
initial_digits = "".join(n.stream()) or "0"
|
|
with self._state_lock:
|
|
self.state.digits = initial_digits
|
|
self.state.delta_sign = 0
|
|
self.state.step = 0
|
|
|
|
self.start_time = time.time()
|
|
self._worker = threading.Thread(
|
|
target=self._worker_loop,
|
|
args=(n, one, two, three),
|
|
daemon=True,
|
|
)
|
|
self._worker.start()
|
|
|
|
try:
|
|
self._ui_loop()
|
|
finally:
|
|
self._stop_event.set()
|
|
if self._worker:
|
|
self._worker.join()
|
|
|
|
def _ui_loop(self) -> None:
|
|
while True:
|
|
snapshot = self._snapshot_state()
|
|
elapsed = time.time() - self.start_time
|
|
avg_step = elapsed / snapshot.step if snapshot.step else 0.0
|
|
if snapshot.computing and snapshot.calc_start_time is not None:
|
|
current_calc = time.time() - snapshot.calc_start_time
|
|
else:
|
|
current_calc = 0.0
|
|
last_calc = snapshot.last_calc_duration
|
|
|
|
digits = snapshot.digits or "0"
|
|
delta = snapshot.delta_sign
|
|
|
|
self.dashboard.render(
|
|
snapshot.step,
|
|
elapsed,
|
|
avg_step,
|
|
digits,
|
|
delta,
|
|
current_calc,
|
|
last_calc,
|
|
snapshot.computing,
|
|
)
|
|
|
|
if self._error:
|
|
self.dashboard.show_message(
|
|
f"Error: {self._error}", UIColors.DOWN
|
|
)
|
|
break
|
|
|
|
ch = self.stdscr.getch()
|
|
if ch == ord("q"):
|
|
self._stop_event.set()
|
|
break
|
|
if ch != -1:
|
|
self.dashboard.handle_input(ch)
|
|
|
|
if snapshot.reached_one:
|
|
self.dashboard.show_message(
|
|
"Reached 1 — press any key to exit.", UIColors.UP
|
|
)
|
|
break
|
|
|
|
if self._stop_event.is_set():
|
|
break
|
|
|
|
time.sleep(0.05)
|
|
|
|
def _snapshot_state(self) -> SharedState:
|
|
with self._state_lock:
|
|
return SharedState(
|
|
step=self.state.step,
|
|
digits=self.state.digits,
|
|
delta_sign=self.state.delta_sign,
|
|
computing=self.state.computing,
|
|
calc_start_time=self.state.calc_start_time,
|
|
last_calc_duration=self.state.last_calc_duration,
|
|
reached_one=self.state.reached_one,
|
|
)
|
|
|
|
def _worker_loop(
|
|
self,
|
|
current: StreamNumber,
|
|
one: StreamNumber,
|
|
two: StreamNumber,
|
|
three: StreamNumber,
|
|
) -> None:
|
|
while not self._stop_event.is_set():
|
|
with self._state_lock:
|
|
self.state.computing = True
|
|
self.state.calc_start_time = time.time()
|
|
|
|
even_step = is_even(current)
|
|
delta_sign = -1 if even_step else 1
|
|
calc_started = time.time()
|
|
|
|
try:
|
|
next_n = collatz_step(current, three, two, one)
|
|
digits = "".join(next_n.stream()) or "0"
|
|
except Exception as exc: # pragma: no cover - safeguard for worker errors
|
|
self._error = exc
|
|
with self._state_lock:
|
|
self.state.computing = False
|
|
self.state.calc_start_time = None
|
|
self._stop_event.set()
|
|
return
|
|
|
|
duration = time.time() - calc_started
|
|
|
|
with self._state_lock:
|
|
self.state.step += 1
|
|
self.state.digits = digits
|
|
self.state.delta_sign = delta_sign
|
|
self.state.last_calc_duration = duration
|
|
self.state.computing = False
|
|
self.state.calc_start_time = None
|
|
if digits == "1":
|
|
self.state.reached_one = True
|
|
|
|
current.free(delete_file=False)
|
|
current = next_n
|
|
|
|
if digits == "1":
|
|
self._stop_event.set()
|
|
return
|
|
|
|
|
|
def run_collatz(stdscr: "curses._CursesWindow") -> None:
|
|
"""Entry point expected by curses.wrapper."""
|
|
CollatzApp(stdscr).run()
|